SpringCloud技术栈-相关的源码详解笔记

Auto Configuration

@Enable* 注解

@SpringBootApplication注解带入了@EnableAutoConfiguration
@EnableAutoConfiguration又带有@Import(EnableAutoConfigurationImportSelector.class)

使用了Spring Core包的SpringFactoriesLoader类的loadFactoryNamesof()方法。
SpringFactoriesLoader会查询META-INF/spring.factories文件中包含的JAR文件。

DeferredImportSelector 接口

1
2
3
4
5
6
7
8
sequenceDiagram

SpringApplication->>SpringApplication:run(args);
SpringApplication->>SpringApplication:refreshContext
SpringApplication->>AbstractApplicationContext:refresh
AbstractApplicationContext->>AbstractApplicationContext:invokeBeanFactoryPostProcessors
AbstractApplicationContext->>PostProcessorRegistrationDelegate:invokeBeanFactoryPostProcessors
PostProcessorRegistrationDelegate->>postProcessor:postProcessBeanDefinitionRegistry
1
2
3
4
5
6
sequenceDiagram

ConfigurationClassPostProcessor->>ConfigurationClassPostProcessor:processConfigBeanDefinitions;
ConfigurationClassPostProcessor->>ConfigurationClassParser:parse
ConfigurationClassParser->>ConfigurationClassParser:processDeferredImportSelectors
ConfigurationClassParser->>DeferredImportSelector:selectImports

以上逻辑就实现了DeferredImportSelector,才得以重写的selectImports起生效

getCandidateConfigurations利用了SpringFactoriesLoader.loadFactoryNames寻找相关的”META-INF/spring.factories”


属性映射

1
2
3
4
5
6
7
8
9
10
@ConfigurationProperties(prefix = "spring.data.mongodb")
public class MongoProperties {

private String host;
private int port = DBPort.PORT;
private String uri = "mongodb://localhost/test";
private String database;

// ... getters/ setters omitted
}

@ConfigurationProperties注释将POJO关联到指定前缀的每一个属性。
例如:spring.data.mongodb.port属性将映射到这个类的端口属性。


@Conditional 注解

以下各个注解有很大帮助实现自动配置功能

  • @ConditionalOnBean
  • @ConditionalOnClass
  • @ConditionalOnExpression
  • @ConditionalOnMissingBean
  • @ConditionalOnMissingClass
  • @ConditionalOnNotWebApplication
  • @ConditionalOnResource
  • @ConditionalOnWebApplication




Eureka


概述

  • 集群重要类:PeerAwareInstanceRegistryImpl
  • 新的Eureka Server节点加入集群后的影响
  • 新服务注册(Register)注册时的影响
  • 服务心跳(renew)
  • 服务下线和剔除
  • 自我保护模式

Eureka Server的集群同步操作

Eureka官网的架构图,下方的操作需要结合下图理解:

PeerAwareInstanceRegistryImpl

集群相关重要的类com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl:为了保证集群里所有Eureka Server节点的状态同步
所有以下操作都会同步到集群的所有服务上:服务注册(Registers)、服务更新(Renewals)、服务取消(Cancels),服务超时(Expirations)和服务状态变更(Status Changes)。

以下是一些部分方法:

  • syncUp:在Eureka Server重启或新的Eureka Server节点加进来的,会执行初始化,从集群其他节点中获取所有的实例注册信息,从而能够正常提供服务。当Eureka - Server启动时,它会从其它节点获取所有的注册信息,如果获取同步失败,它在一定时间(此值由决定)内拒绝服务。
  • replicateToPeers: 同步以下操作到所有的集群节点:服务注册(Registers)、服务更新(Renewals)、服务取消(Cancels),服务超时(Expirations)和服务状态变更(Status Changes)
  • register:注册实例,并且复印此实例的信息到所有的eureka server的节点。如果其它Eureka Server调用此节点,只在本节点更新实例信息,避免通知其他节点执行更新
  • renew:心跳,同步集群
  • cancel
  • 其他

Eureka Server集群之间的状态是采用异步方式同步的,所以不保证节点间的状态一定是一致的,不过基本能保证最终状态是一致的。

新的Eureka Server节点加入集群后的影响

当有新的节点加入到集群中,会对现在Eureka Server和Eureka Client有什么影响以及他们如何发现新增的Eureka Server节点:

  • 新增的Eureka Server:在Eureka Server重启或新的Eureka Server节点加进来的,它会从集群里其它节点获取所有的实例注册信息。如果获取同步失败,它会在一定时间(此值由决定eureka.server.peer-eureka-nodes-update-interval-ms决定)内拒绝服务。
  • 已有Eureka Server和Service Consumer如何发现新的Eureka Server

    • 已有的Eureka Server:在运行过程中,Eureka Server之间会定时同步实例的注册信息。这样即使新的Application Service只向集群中一台注册服务,则经过一段时间会集群中所有的Eureka Server都会有这个实例的信息。那么Eureka Server节点之间如何相互发现,各个节点之间定时(时间由eureka.server.peer-eureka-nodes-update-interval-ms决定)更新节点信息,进行相互发现。
    • Service Consumer:Service Consumer刚启动时,它会从配置文件读取Eureka Server的地址信息。当集群中新增一个Eureka Server中时,那么Service Provider如何发现这个Eureka Server?Service Consumer会定时(此值由eureka.client.eureka-service-url-poll-interval-seconds决定)调用Eureka Server集群接口,获取所有的Eureka Server信息的并更新本地配置。

新服务注册(Register)注册时的影响

Service Provider要对外提供服务,把自己注册到Eureka Server上。如果配置参数eureka.client.registerWithEureka=true(默认值true)时,会向Eureka Server注册进行注册,Eureka Server会保存注册信息到内存中。

Service Consumer为了避免每次调用服务请求都需要向Eureka Server获取服务实例的注册信息,此时需要设置eureka.client.fetchRegistry=true,它会在本地缓存所有实例注册信息。为了保证缓存数据的有效性,它会定时(值由eureka.client.registry-fetch-interval-seconds定义,默认值为30s)向注册中心更新实例。

服务心跳(renew)

服务实例会通过心跳(eureka.instance.lease-renewal-interval-in-seconds定义心跳的频率,默认值为30s)续约的方式向Eureka Server定时更新自己的状态。Eureka Server收到心跳后,会通知集群里的其它Eureka Server更新此实例的状态。Service Provider/Service Consumer也会定时更新缓存的实例信息。

服务下线和剔除

服务的下线有两种情况:

  • 在Service Provider服务shutdown的时候,主动通知Eureka Server把自己剔除,从而避免客户端调用已经下线的服务。
  • Eureka Server会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认值为0,默认情况不删除实例)进行检查,如果发现实例在在一定时间(此值由eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)内没有收到心跳,则会注销此实例。

这种情况下,Eureka Client的最多需要[eureka.instance.lease-renewal-interval-in-seconds + eureka.client.registry-fetch-interval-seconds]时间才发现服务已经下线。同理,一个新的服务上线后,Eureka Client的服务消费方最多需要相同的时间才发现服务已经上线

服务下线,同时会更新到Eureka Server其他节点和Eureka client的缓存,流程类似同以上的register过程

自我保护模式

如果Eureka Server最近1分钟收到renew的次数小于阈值(即预期的最小值),则会触发自我保护模式,此时Eureka Server此时会认为这是网络问题,它不会注销任何过期的实例。等到最近收到renew的次数大于阈值后,则Eureka Server退出自我保护模式。

自我保护模式阈值计算:

  • 每个instance的预期心跳数目 = 60/每个instance的心跳间隔秒数
  • 阈值 = 所有注册到服务的instance的数量的预期心跳之和 *自我保护系数

以上的参数都可配置的:

  • instance的心跳间隔秒数:eureka.instance.lease-renewal-interval-in-seconds
  • 自我保护系数:eureka.server.renewal-percent-threshold

如果我们的实例比较少且是内部网络时,推荐关掉此选项。我们也可以通过eureka.server.enable-self-preservation = false来禁用自我保护系数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
server:
port: 10761
spring:
application:
name: cloud-registration-center
## eureka : 主要配置属性在EurekaInstanceConfigBean和EurekaClientConfigBean中
eureka:
instance:
# hostname: 127.0.0.1
# 使用IP注册
preferIpAddress: true
# 心跳间隔
lease-renewal-interval-in-seconds: 3
# 服务失效时间: 如果多久没有收到请求,则可以删除服务
lease-expiration-duration-in-seconds: 7
client:
# 关闭eureka client
# enabled: false
# 注册自身到eureka服务器
registerWithEureka: true
# 表示是否从eureka服务器获取注册信息
fetchRegistry: false
# 客户端从Eureka Server集群里更新Eureka Server信息的频率
eureka-service-url-poll-interval-seconds: 60
# 定义从注册中心获取注册服务的信息
registry-fetch-interval-seconds: 5
# 设置eureka服务器所在的地址,查询服务和注册服务都需要依赖这个地址
serviceUrl:
defaultZone: http://127.0.0.1:10761/eureka/
# 设置eureka服务器所在的地址,可以同时向多个服务注册服务
# defaultZone: http://192.168.21.3:10761/eureka/,http://192.168.21.4:10761/eureka/
server:
# renewal-percent-threshold: 0.1
# 关闭自我保护模式
enable-self-preservation: false
# Eureka Server 自我保护系数,当enable-self-preservation=true时,启作用
# renewal-percent-threshold:
# 设置清理间隔,单位为毫秒,默认为0
eviction-interval-timer-in-ms: 3000
# 设置如果Eureka Server启动时无法从临近Eureka Server节点获取注册信息,它多久不对外提供注册服务
wait-time-in-ms-when-sync-empty: 6000000
# 集群之间相互更新节点信息的时间频率
peer-eureka-nodes-update-interval-ms: 60000




Eureka-Client

EurekaClientAutoConfigurationspring.factories纳入,自动被加载实例bean
EurekaClientAutoConfiguration里面有eurekaClient存在

1
2
3
4
5
sequenceDiagram

EmbeddedWebApplicationContext->>AbstractApplicationContext:super.refresh();
AbstractApplicationContext->>AbstractApplicationContext:finishBeanFactoryInitialization();
AbstractApplicationContext->>AbstractApplicationContext:beanFactory.preInstantiateSingletons();

eurekaClient的实例化Bean中

1
new CloudEurekaClient(manager, config, this.optionalArgs,this.context);

CloudEurekaClient又继承于DiscoveryClient
至此会实例化相关的DiscoveryClient


CloudEurekaClient继承与DiscoveryClient

img

  1. 创建 EurekaInstanceConfig对象
  2. 使用 EurekaInstanceConfig对象 创建 InstanceInfo对象
  3. 使用 EurekaInstanceConfig对象 + InstanceInfo对象 创建
  4. ApplicationInfoManager对象
  5. 创建 EurekaClientConfig对象

使用 ApplicationInfoManager对象 + EurekaClientConfig对象 创建 EurekaClient对象

其中ApplicationInfoManager包括

  • EurekaInstanceConfig
  • InstanceInfo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ApplicationInfoManager {
/**
* 单例
*/
private static ApplicationInfoManager instance = new ApplicationInfoManager(null, null, null);
/**
* 状态变更监听器
*/
protected final Map<String, StatusChangeListener> listeners;
/**
* 应用实例状态匹配
*/
private final InstanceStatusMapper instanceStatusMapper;
/**
* 应用实例信息
*/
private InstanceInfo instanceInfo;
/**
* 应用实例配置
*/
private EurekaInstanceConfig config;

// ... 省略其它构造方法
public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
this.config = config;
this.instanceInfo = instanceInfo;
this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
if (optionalArgs != null) {
this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
} else {
this.instanceStatusMapper = NO_OP_MAPPER;
}
// Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
instance = this;
}

// ... 省略其它方法
}
  • EurekaInstanceConfig,重在应用实例,例如,应用名、应用的端口等等。此处应用指的是,Application Consumer 和 Application Provider。

  • EurekaClientConfig,重在 Eureka-Client,例如, 连接的 Eureka-Server 的地址、获取服务提供者列表的频率、注册自身为服务提供者的频率等等。

初始化线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// DiscoveryClient.java 变量
/**
* 线程池
*
* A scheduler to be used for the following 3 tasks: 【目前只有两个】
* - updating service urls
* - scheduling a TimedSuperVisorTask
*/
private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
/**
* 心跳执行器
*/
private final ThreadPoolExecutor heartbeatExecutor;
/**
* {@link #localRegionApps} 刷新执行器
*/
private final ThreadPoolExecutor cacheRefreshExecutor;



// DiscoveryClient.java 构造方法
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff

scheduler:定时任务线程池,初始化大小为 2,定时给任务,一个给 heartbeatExecutor,一个给 cacheRefreshExecutor

heartbeatExecutor,cacheRefreshExecutor:在提交给scheduler 才声明具体的任务。

初始化定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// DiscoveryClient.java 构造方法
initScheduledTasks();
// DiscoveryClient.java
private void initScheduledTasks() {
// 从 Eureka-Server 拉取注册信息执行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}

定时提交任务到Heartbeat线程池,另一个任务到cacheRefresh线程池

初始化 Eureka 网络通信相关

1
2
3
// DiscoveryClient.java 构造方法
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);






Eureka Server

Spring Cloud Eureka Server

1
2
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

自动摄取了EurekaServerAutoConfiguration配置类
里面包含了EurekaServerBootstrap这个Bean




EurekaServerBootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class EurekaBootStrap implements ServletContextListener {
// 省略无关变量和方法
@Override
public void contextInitialized(ServletContextEvent event) {
try {
// 初始化 Eureka-Server 配置环境
initEurekaEnvironment();
// 初始化 Eureka-Server 上下文
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
}

Eureka-Server 配置环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// EurekaBootStrap.java
/**
* 部署环境 - 测服
*/
private static final String TEST = "test";
private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment";
private static final String EUREKA_ENVIRONMENT = "eureka.environment";
/**
* 部署数据中心 - CLOUD
*/
private static final String CLOUD = "cloud";
/**
* 部署数据中心 - 默认
*/
private static final String DEFAULT = "default";
private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter";
private static final String EUREKA_DATACENTER = "eureka.datacenter";
protected void initEurekaEnvironment() throws Exception {
logger.info("Setting the eureka configuration..");
// 设置配置文件的数据中心
String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
if (dataCenter == null) {
logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
} else {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
// 设置配置文件的环境
String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
logger.info("Eureka environment value eureka.environment is not set, defaulting to test");
}
}




EurekaBootStrap

Eureka Client

Eureka-Server 内嵌 Eureka-Client,用于和 Eureka-Server 集群里其他节点通信交互。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ApplicationInfoManager applicationInfoManager;
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();

applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}

注册

img

Eureka-Client

发起注册 | 应用实例信息复制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// DiscoveryClient.java
public class DiscoveryClient implements EurekaClient {
/**
* 应用实例状态变更监听器
*/
private ApplicationInfoManager.StatusChangeListener statusChangeListener;
/**
* 应用实例信息复制器
*/
private InstanceInfoReplicator instanceInfoReplicator;
private void initScheduledTasks() {
// ... 省略无关代码

if (clientConfig.shouldRegisterWithEureka()) {

// ... 省略无关代码

// 创建 应用实例信息复制器
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 创建 应用实例状态变更监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
// ... 省略无关代码
};
// 注册 应用实例状态变更监听器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启 应用实例信息复制器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

}
}

刷新应用实例信息

InstanceInfoReplicator#run函数里面调用

DiscoveryClient#refreshInstanceInfo()方法,刷新应用实例信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void refreshInstanceInfo() {
// 刷新 数据中心信息
applicationInfoManager.refreshDataCenterInfoIfRequired();
// 刷新 租约信息
applicationInfoManager.refreshLeaseInfoIfRequired();
// 健康检查
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}

Eureka-Server

接收注册请求

注册应用实例信息的请求,映射 ApplicationResource#addInstance() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Produces({"application/xml", "application/json"})
public class ApplicationResource {
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 校验参数是否合法
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// AWS 相关,跳过
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 注册应用实例信息
registry.register(info, "true".equals(isReplication));
// 返回 204 成功
return Response.status(204).build(); // 204 to be backwards compatible
}
}

调用 AbstractInstanceRegistry#register(...) 方法,注册应用实例信息,实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}

// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);

// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}

续租

img

Eureka-Client

Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。
Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ),避免租约过期。
默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。

初始化定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);

// ... 省略无关代码
}
// ... 省略无关代码
}


// DiscoveryClient.java
/**
* 最后成功向 Eureka-Server 心跳时间戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}

// DiscoveryClient.java
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
// 发起注册
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}

Eureka-Server

接收续租请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

// 省略代码
return response;
}


//boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// PeerAwareInstanceRegistryImpl.java
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) { // 续租
// Eureka-Server 复制
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}



public boolean renew(String appName, String id, boolean isReplication) {
// 增加 续租次数 到 监控
RENEW.increment(isReplication);
// 获得 租约
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 租约不存在
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatus(overriddenInstanceStatus);
}
}
// 新增 续租每分钟次数
renewsLastMin.increment();
// 设置 租约最后更新时间(续租)
leaseToRenew.renew();
return true;
}
}






Feign

Feign 涉及到了两个注解,一个是@EnableFeignClients,用来开启 Feign
另一个是@FeignClient,用来标记要用 Feign 来拦截的请求接口。

注解说明

EnableFeignClients 注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
String[] value() default {};
String[] basePackages() default {};
Class<?>[] basePackageClasses() default {};
Class<?>[] defaultConfiguration() default {};
Class<?>[] clients() default {};
}


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FeignClient {
@AliasFor("name")
String value() default "";
@Deprecated
String serviceId() default "";
@AliasFor("value")
String name() default "";
String qualifier() default "";
String url() default "";
boolean decode404() default false;
Class<?>[] configuration() default {};
Class<?> fallback() default void.class;
Class<?> fallbackFactory() default void.class;
String path() default "";
boolean primary() default true;
}

使用了@Import(FeignClientsRegistrar.class)
Spring会包含此FeignClientsRegistrar类信息

因实现了ImportBeanDefinitionRegistrar
所以在后期会执行ImportBeanDefinitionRegistrar#registerBeanDefinitions






Feign 是怎么工作的?

  • 第一是为接口定义的每个接口都生成一个实现方法,结果就是 SynchronousMethodHandler 对象。
  • 第二是为该服务接口生成了动态代理。动态代理的实现是 ReflectiveFeign.FeignInvocationHanlder,代理被调用的时候,会根据当前调用的方法,转到对应的SynchronousMethodHandler




当对接口的实例进行请求时(Autowire 的对象是某个ReflectiveFeign.FeignInvocationHanlder 的实例)

根据方法名进入了某个 SynchronousMethodHandler 对象的 invoke 方法。

SynchronousMethodHandler 其实也并不处理具体的 HTTP 请求,它关心的更多的是请求结果的处理。
HTTP 请求的过程,包括服务发现,都交给了当前 context 注册中的 Client 实现类比如: LoadBalancerFeignClient

Retry 的逻辑实际上已经提出来了
但是 fallback 并没有在上面体现,因为我们上面分析动态代理的过程中,用的是 Feign.Builder
而如果有 fallback 的情况下,会使用 HystrixFeign.Builder
这是 Feign.Builder的一个子类。

它在创建动态代理的时候,主要改了一个东西就是 invocationFactory 从默认的 InvocationHandlerFactory.Default变成了一个内部匿名工厂,这个工厂的create 方法返回的不是 ReflectiveFeign.FeignInvocationHandler,而是HystrixInvocationHandler

所以动态代理类换掉了,invoke 的逻辑就变了,每次Invoke都会带HystrixCommand。
HystrixCommandSynchronousMethodHandlerfallback一起封装

1
HystrixInvocationHandler.this.dispatch.get(method).invoke(args);

在新的逻辑里简单的将方法转到对应的SynchronousMethodHandler上面并且执行该对象。






工作流程

Spring 通过调用其 FeignClientsRegistrar#registerBeanDefinitions 方法来获取其提供的 bean definition。

这里会往 Registry 里面添加两种 BeanDefinition,

  • 一个是 FeignClientSpecification
    主要可调整项是通过 EnableFeignClients 注解的 defaultConfiguration 参数传入。

    1
    registerClientConfiguration(registry, name,defaultAttrs.get("defaultConfiguration"));
  • 一个是负责注册 FeignClient,分为以下几步

    1. 找到 basePackage 下面所有包含了 FeignClient 注解的类
    2. 读取类上面的 FeignClient 注解参数
    3. 如果该注解包括了 configuration 参数则先注册 configuration 所指定的类。
      这个类也是包装在 FeignClientSpecification 里面的,在 FeignClient 上指定的 configuration 类是它的一个属性。

      1
      registerClientConfiguration(registry, name,attributes.get("configuration"));
    4. 注册该注解了 FeignClient 的接口,生成 BeanDefinition 时是以 FeignClientFactoryBean 作为对象创建的,而使用了 FeignClient 注解的接口是作为该 Bean 的一个属性,同时,对于 FeignClient 注解配置的参数,比如 fallback 等都一并作为参数放入 BeanDefinition 中。

总结一下,这些 BeanDefinition 分为两类:

  • FeignClientSpecification,包括了所有 FeignClient 上指定 configuration 以及在 EnableFeignClients 上指定的 defaultConfiguration。

    1
    2
    3
    4
    前者的名字为注解的 URL 或者 value 组成  
    比如TEST-SERVICE
    后者的名字为 default.$CLASSNAME
    比如 default.name.org.xiang.SpringBootApp。
  • FeignClientFactoryBean,它包含了所有使用了 FeignClient 注解的接口信息以及注解上面的参数。
1
2
它的名字为注解的 URL 或者 value,比如 TEST-SERVICE
跟它上面的 configuration 创建出来的 bean 定义是同一个名字。

FeignAutoConfiguration

给当前环境装配Targeter以及Client

1
2
@Autowired(required = false) //自动注入FeignClientSpecification
private List<FeignClientSpecification> configurations = new ArrayList<>();

后期各种AutoConfiguration来实现自动装配

1
2
例如:FeignRibbonClientAutoConfiguration  
装配Client为LoadBalancerFeignClient

FeignClientFactoryBean

它是一个工厂类,Spring Context 创建 Bean 实例时会调用它的 getObject 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Object getObject() throws Exception {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);

if (!StringUtils.hasText(this.url)) {
String url;
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
return loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not lod balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}

Configuration 与 FeignBuilder 关联

1
2
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);

spring-cloud 也提供了一个 Feign 的初始化配置类:FeignAutoConfiguration。
它初始化了一个新的 FeignContext bean,并且把所有的 configuration 都放在 FeignContext 里面。

FeignContext 继承了 NamedContextFactory,它会管理一批 Context
外部调用的时候会指定用哪个 context 来寻找对应的 bean
而 context 如果不存在,则会创建一个新的 AnnotationConfigApplicationContext。
创建 context 的时候,会用 name 去匹配已有的 configurations(加载该 FeignClient 注解里面提供的 configuration 属性类)
如果有同名的,则就将该 configuration 注册进 context
另外如果有 default 开头的 configuration,也会将其注册到 context 里面
最后调用 refresh 方法对 context 进行初始化。
初始化以后,相当于 configuration 里面提供的encoder,decoder 这些就逗号了。

在 FeignClientFactoryBean 创建 Bean 的时候它先从 applicationContext 里面找到已经构建好的 feignContext

初始化一个 FeignBuilder,FeignBuilder 会利用 context 里面包含的对应的 configuration 指定的 bean
获取指定的类,比如 decoder,encoder,retryer,errorDecoder。
然后再去寻找 context 中的 Client bean。


Client Bean

1
Client client = getOptional(context, Client.class);

如果当前路径里面有Ribbon,那么 Spring Boot 启动时就会创建一个LoadBalancerFeignClient
如果没有,FeignAutoConfiguration 里面也会自己去创建 ApacheHttpClient 或者 OKHttpClient
FeignBuilder 会拿这个 client 配到自己里面。

Ribbon的负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
LoadBalancerFeignClient 做为FeignRibbonClientAutoConfiguration#feignClient引入
LoadBalancerFeignClient#lbClient利用内部成员变量CachingSpringLoadBalancerFactory创建合适的Client
主要看是否有重试?RetryableFeignLoadBalancer:FeignLoadBalancer,这两个类都继承AbstractLoadBalancerAwareClient
executeWithLoadBalancer就是执行父抽象类AbstractLoadBalancerAwareClient#executeWithLoadBalancer
由LoadBalancerCommand以RxJava方式去submit一个请求执行
LoadBalancerCommand#selectServer选择服务器间接引出LoadBalancerContext再引出ILoadBalancer



所以具体交给了ILoadBalancer来处理,ILoadBalancer通过配置IRule、IPing等信息
由BaseLoadBalancer引出了IRule.choose(key);默认为RoundRobinRule

并向EurekaClient获取注册列表的信息,并默认10秒一次向EurekaClient发送"ping",进而检查是否更新服务列表
最后得到注册列表后,ILoadBalancer根据IRule的策略进行负载均衡。


而RestTemplate被@LoadBalance注解后能过用负载均衡;主要是维护了一个被@LoadBalance注解的RestTemplate列表,并给列表中的RestTemplate添加拦截器,进而交给负载均衡器去处理。

Targeter 生产代理类

1
2
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, new HardCodedTarget<>(this.type, this.name, url));

然后是最关键的一步,获取 Targeter 来生成动态代理类,在 FeignAutoConfiguration 里面,
指定了生成 HystrixTargeter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
return loadBalance(builder, context, new HardCodedTarget<>(this.type,this.name, url));

protected <T> T loadBalance(Feign.Builder builder, FeignContext context,
HardCodedTarget<T> target) {
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}

throw new IllegalStateException(
"No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-ribbon?");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
SetterFactory setterFactory = getOptional(factory.getName(), context,
SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(factory.getName(), context, target, builder, fallback);
}
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
}

return feign.target(target);
}

如果 FeignClient 没有定义 fallback,或者说 Builder 不是 HystrixFeignBuilder,则直接用 FeignBuidler 的 target 方法生成代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
public <T> T target(Target<T> target) {
return build().newInstance(target);
}

public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory);
}

生成代理

ReflectiveFeign 生成动态代理对象(newInstance)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public <T> T newInstance(Target<T> target) {
//为每个方法创建一个SynchronousMethodHandler对象,并放在 Map 里面。
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {
//如果是 default 方法,说明已经有实现了,用 DefaultHandler
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
//否则就用上面的 SynchronousMethodHandler
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
// 创建动态代理,factory 是 InvocationHandlerFactory.Default,创建出来的是 ReflectiveFeign.FeignInvocationHanlder,也就是说后续对方法的调用都会进入到该对象的 inovke 方法。
InvocationHandler handler = factory.create(target, methodToHandler);
// 创建动态代理对象
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);

for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}

public Map<String, MethodHandler> apply(Target key) {
//取出要实现的接口的所有方法
List<MethodMetadata> metadata = contract.parseAndValidatateMetadata(key.type());
Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>();
for (MethodMetadata md : metadata) {
//根据目标接口类和方法上的注解信息判断该用哪种 buildTemplate
BuildTemplateByResolvingArgs buildTemplate;
if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null) {
buildTemplate = new BuildFormEncodedTemplateFromArgs(md, encoder);
} else if (md.bodyIndex() != null) {
buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder);
} else {
buildTemplate = new BuildTemplateByResolvingArgs(md);
}
//调用synchronousMethodHandlerFactory来生成SynchronousMethodHandler对象。这个就是对接口某个方法的实现。
result.put(md.configKey(),
factory.create(key, md, buildTemplate, options, decoder, errorDecoder));
}
return result;
}

synchronousMethodHandlerFactory 的 create 方法。

1
2
3
4
5
6
7
public MethodHandler create(Target<?> target, MethodMetadata md,
RequestTemplate.Factory buildTemplateFromArgs,
Options options, Decoder decoder, ErrorDecoder errorDecoder) {
return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
logLevel, md, buildTemplateFromArgs, options, decoder,
errorDecoder, decode404);
}

ReflectiveFeign.FeignInvocationHanlder 的 invoke 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//通过动态代理实现了几个通用方法,比如 equals、toString、hasCode
if ("equals".equals(method.getName())) {
try {
Object
otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
//找到具体的 method 的 Handler,然后调用 invoke 方法。这样就又进入了SynchronousMethodHandler对象的 invoke 方法。
return dispatch.get(method).invoke(args);
}

SynchronousMethodHandler 的 invoke 方法主要是应用 encoder,decoder 以及 retry 等配置并且自身对于调用结果有一定的处理逻辑。

我们最关心的请求实现,实际上是在组装 SynchronousMethodHandler 的 client 参数上
即前面提到的,如果当前路径里面有 Ribbon,就是 LoadBalancerFeignClient
如果没有,根据配置生成 ApacheHttpClient 或者 OKHttpClient。
在 Ribbon 里面,实现了 Eureka 服务发现以及进行请求等动作,当然 Ribbon 里面还带了负载均衡逻辑。

SynchronousMethodHandler#invoke方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}

Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);

if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}

Response response;
long start = System.nanoTime();
try {
//通过 client 获得请求的返回值
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);

boolean shouldClose = true;
try {
if (logLevel != Logger.Level.NONE) {
response =
logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
}
if (Response.class == metadata.returnType()) {
if (response.body() == null) {
return response;
}
if (response.body().length() == null ||
response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
return response;
}
// Ensure the response body is disconnected
byte[] bodyData = Util.toByteArray(response.body().asInputStream());
return response.toBuilder().body(bodyData).build();
}
if (response.status() >= 200 && response.status() < 300) {
if (void.class == metadata.returnType()) {
return null;
} else {
return decode(response);
}
} else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
return decode(response);
} else {
throw errorDecoder.decode(metadata.configKey(), response);
}
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
}
throw errorReading(request, response, e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}


Feign调用流程

业务FeignClient的调用->ReflectiveFeign.FeignInvocationHandler#invoker

invoker代码引出如下

1
return dispatch.get(method).invoke(args);

SynchronousMethodHandler#invoke

  • 创建合适的RequestTemplate
  • 执行executeAndDecode
    • 通过targetRequestRequestTemplate转换为request
    • client.execute(request, options);(由client引出LoadBalancerFeignClient)
  • LoadBalancerFeignClient#execute
    • 根据request构建ribbonRequest
    • lbClient(clientName).executeWithLoadBalancer(ribbonRequest,requestConfig).toResponse();
  • lbClient(clientName)(通过lbClientFactory工厂创建合适的client去执行负载请求)
    • 获取相关的ILoadBalancer,IClientConfig构建FeignLoadBalancer
  • FeignLoadBalancer#executeWithLoadBalancer执行点在基础抽象类AbstractLoadBalancerAwareClient#executeWithLoadBalancer
    • 构建命令LoadBalancerCommand
    • 命令执行command.submit
      • selectServer选取合适的服务列表
      • AbstractLoadBalancerAwareClient.this.execute(引出FeignLoadBalancer#execute)
  • FeignLoadBalancer#execute
    • Client.Default#execute(HttpURLConnection请求回复)






Ribbon

ribbon的6个主要组件:IRuleIPingServerListServerListFilterServerListUpdaterILoadBalancer

功能说明

Ribbon主要包括如下功能

  • 1.支持通过DNS和IP和服务端通信
  • 2.可以根据算法从多个服务中选取一个服务进行访问
  • 3.通过将客户端和服务器分成几个区域(zone)来建立客户端和服务器之间的关系。客户端尽量访问和自己在相同区域(zone)的服务,减少服务的延迟
  • 4.保留服务器的统计信息,ribbon可以实现用于避免高延迟或频繁访问故障的服务器
  • 5.保留区域(zone)的统计数据,ribbon可以实现避免可能访问失效的区域(zone)

Ribbon主要组件

Ribbon主要包含如下组件:

  • 1.IRule
  • 2.IPing
  • 3.ServerList
  • 4.ServerListFilter
  • 5.ServerListUpdater
  • 6.IClientConfig
  • 7.ILoadBalancer


IRule

功能:根据特定算法中从服务列表中选取一个要访问的服务

常用IRule实现有以下几种:

  • RoundRobinRule
    轮询规则,默认规则。同时也是更高级rules的回退策略

  • AvailabilityFilteringRule
    这个负载均衡器规则,会先过滤掉以下服务:

a. 由于多次访问故障而处于断路器跳闸状态
b. 并发的连接数量超过阈值
然后对剩余的服务列表按照RoundRobinRule策略进行访问

  • WeightedResponseTimeRule
    根据平均响应时间计算所有服务的权重,响应时间越快,服务权重越重、被选中的概率越高。刚启动时,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换到WeightedResponseTimeRule。

  • RetryRule
    先按照RoundRobinRule的策略获取服务,如果获取服务失败,则在指定时间内会进行重试,获取可用的服务

  • BestAvailableRule
    此负载均衡器会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务

  • RandomRule
    随机获取一个服务


IPing

功能:在后台运行的一个组件,用于检查服务列表是否都活

  • NIWSDiscoveryPing
    不执行真正的ping。如果Discovery Client认为是在线,则程序认为本次心跳成功,服务活着

  • PingUrl
    此组件会使用HttpClient调用服务的一个URL,如果调用成功,则认为本次心跳成功,表示此服务活着。

  • NoOpPing
    永远返回true,即认为服务永远活着

  • DummyPing
    默认实现,默认返回true,即认为服务永远活着


ServerList

功能:存储服务列表。分为静态和动态。如果是动态的后台有个线程会定时刷新和过滤服务列表

常用ServerList 实现有以下几种:

  • ConfigurationBasedServerList
    从配置文件中获取所有服务列表

配置例子:

1
sample-client.ribbon.listOfServers=www.microsoft.com:80,www.yahoo.com:80,www.google.com:80
  • DiscoveryEnabledNIWSServerList

从Eureka Client中获取服务列表。此值必须通过属性中的VipAddress来标识服务器集群。DynamicServerListLoadBalancer会调用此对象动态获取服务列表

  • DomainExtractingServerList
    代理类,根据ServerList的值实现具体的逻辑


ServerListFilter

该接口允许过滤配置或动态获取的具有所需特性的服务器列表。ServerListFilter是DynamicServerListLoadBalancer用于过滤从ServerList实现返回的服务器的组件。

常用ServerListFilter 实现有以下几种:

  • ZoneAffinityServerListFilter
    过滤掉所有的不和客户端在相同zone的服务,如果和客户端相同的zone不存在,才不过滤不同zone有服务。

启用此配置使用以下配置

1
<clientName>.ribbon.EnableZoneAffinity=true
  • ZonePreferenceServerListFilter

ZoneAffinityServerListFilter的子类。和ZoneAffinityServerListFilter相似,但是比较的zone是发布环境里面的zone。
过滤掉所有和客户端环境里的配置的zone的不同的服务,如果和客户端相同的zone不存在才不进行过滤。

  • ServerListSubsetFilter

ZoneAffinityServerListFilter的子类。此过滤器确保客户端仅看到由ServerList实现返回的整个服务器的固定子集。
它还可以定期用新服务器替代可用性差的子集中的服务器。

要启用此过滤器,请指定以下属性:

1
2
3
4
5
6
<clientName>.ribbon.NIWSServerListClassName=com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList 
# the server must register itself with Eureka server with VipAddress "myservice"
<clientName>.ribbon.DeploymentContextBasedVipAddresses=myservice
<clientName>.ribbon.NIWSServerListFilterClassName=com.netflix.loadbalancer.ServerListSubsetFilter
# only show client 5 servers. default is 20.
<clientName>.ribbon.ServerListSubsetFilter.size=5


ServerListUpdater

功能:被DynamicServerListLoadBalancer用于动态的更新服务列表。

常用的实现类:

  • PollingServerListUpdater

默认的实现策略。此对象会启动一个定时线程池,定时执行更新策略

  • EurekaNotificationServerListUpdater

当收到缓存刷新的通知,会更新服务列表。


IClientConfig

功能:定义各种配置信息,用来初始化ribbon客户端和负载均衡器

常用IClientConfig实现有以下几种:

  • DefaultClientConfigImpl

IClientConfig的默认实现,配置文件里的部分值为ribbon。


ILoadBalancer

定义软件负载平衡器操作的接口。动态更新一组服务列表及根据指定算法从现有服务器列表中选择一个服务

  • DynamicServerListLoadBalancer

DynamicServerListLoadBalancer组合Rule、IPing、ServerList、ServerListFilter、ServerListUpdater 实现类,实现动态更新和过滤更新服务列表

  • ZoneAwareLoadBalancer

这是DynamicServerListLoadBalancer的子类,主要加入zone的因素。统计每个zone的平均请求的情况,保证从所有zone选取对当前客户端服务最好的服务组列表


默认的ribbon实现类

  • IClientConfig ribbonClientConfig: DefaultClientConfigImpl
  • IRule ribbonRule: ZoneAvoidanceRule
  • IPing ribbonPing: DummyPing
  • ServerList ribbonServerList: ConfigurationBasedServerList
  • ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter
  • ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer
  • ServerListUpdater ribbonServerListUpdater: PollingServerListUpdater


通过配置文件配置Ribbon的主要组件

1
2
3
4
5
<clientName>.ribbon.NFLoadBalancerClassName=xx
<clientName>.ribbon.NFLoadBalancerRuleClassName=xx
<clientName>.ribbon.NFLoadBalancerPingClassName=xx
<clientName>.ribbon.NIWSServerListClassName=xx
<clientName>.ribbon.NIWSServerListFilterClassName=xx

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class MyDefaultRibbonConfig {
// @Bean
// public IRule ribbonRule() {
// return new MyRule();
// }

@Bean
public IPing ribbonPing() {
return new MyPingUrl(new NIWSDiscoveryPing());
}

}






Hystrix

简介

方法 执行方式
#execute() 同步调用,返回直接结果
#queue() 异步调用,返回 java.util.concurrent.Future
#observe() 异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable() 未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
  • #toObservable() 方法 :未做订阅,返回干净的 Observable 。这就是为什么上文说”未调用” 。
  • #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅 。
    ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。
  • #queue() 方法 :调用 #toObservable() 方法的基础上,调用:
    • Observable#toBlocking() 方法 :将 Observable 转换成阻塞的 rx.observables.BlockingObservable 。
    • BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法执行结果的 Future 。
      • #run() 方法 :子类实现该方法,执行正常的业务逻辑。
  • #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。

img


应用

img

线程池隔离

Hystrix通过命令模式,将每个类型的业务请求封装成对应的命令请求

  • 查询订单->订单Command
  • 查询商品->商品Command
  • 查询用户->用户Command。
    每个类型的Command对应一个线程池。创建好的线程池是被放入到ConcurrentHashMap中
    1
    2
    final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
    threadPools.put("hystrix-order", new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

final int dynamicCoreSize = corePoolSize.get();
final int dynamicMaximumSize = maximumPoolSize.get();

if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
} else {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
}
}

运用至项目-线程隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class GetOrderCommand extends HystrixCommand<List> {

OrderService orderService;

public GetOrderCommand(String name){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withMaxQueueSize(10) //配置队列大小
.withCoreSize(2) // 配置线程池里的线程数
)
);
}

@Override
protected List run() throws Exception {
return orderService.getOrderList();
}

public static class UnitTest {
@Test
public void testGetOrder(){
// new GetOrderCommand("hystrix-order").execute();
Future<List> future =new GetOrderCommand("hystrix-order").queue();
}

}
}

线程池隔离 - 优缺点

  • 线程池隔离的优点:

    • 应用程序会被完全保护起来,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。
    • 我们给应用程序引入一个新的风险较低的客户端lib的时候,如果发生问题,也是在本lib中,并不会影响到其他内容,因此我们可以大胆的引入新lib库。
    • 当依赖的一个失败的服务恢复正常时,应用程序会立即恢复正常的性能。
    • 如果我们的应用程序一些参数配置错误了,线程池的运行状况将会很快显示出来,比如延迟、超时、拒绝等。同时可以通过动态属性实时执行来处理纠正错误的参数配置。
    • 如果服务的性能有变化,从而需要调整,比如增加或者减少超时时间,更改重试次数,就可以通过线程池指标动态属性修改,而且不会影响到其他调用请求。
    • 除了隔离优势外,hystrix拥有专门的线程池可提供内置的并发功能,使得可以在同步调用之上构建异步的外观模式,这样就可以很方便的做异步编程(Hystrix引入了Rxjava异步框架)。
  • 线程池隔离的缺点:

    • 线程池的主要缺点就是它增加了计算的开销,每个业务请求(被包装成命令)在执行的时候,会涉及到请求排队,调度和上下文切换。不过Netflix公司内部认为线程隔离开销足够小,不会产生重大的成本或性能的影响。

信号量隔离

将属性execution.isolation.strategy设置为SEMAPHORE
象这样 ExecutionIsolationStrategy.SEMAPHORE,则Hystrix使用信号量而不是默认的线程池来做隔离。

线程池方式下业务请求线程和执行依赖的服务的线程不是同一个线程;
信号量方式下业务请求线程和执行依赖服务的线程是同一个线程

运用至项目-信号量隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {

private final int id;

public CommandUsingSemaphoreIsolation(int id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.id = id;
}

@Override
protected String run() {
return "ValueFromHashMap_" + id;
}
}

信号量隔离 - 优缺点

信号量隔离的方式是限制了总的并发数,每一次请求过来,请求线程和调用依赖服务的线程是同一个线程,那么如果不涉及远程RPC调用(没有网络开销)则使用信号量来隔离,更为轻量,开销更小。
当我们依赖的服务是极低延迟的,比如访问内存缓存,就没有必要使用线程池的方式,那样的话开销得不偿失

熔断器

Hystrix在运行过程中会向每个commandKey对应的熔断器报告成功、失败、超时和拒绝的状态
熔断器维护计算统计的数据,根据这些统计的信息来确定熔断器是否打开。

如果打开,后续的请求都会被截断。然后会隔一段时间默认是5s,尝试半开,放入一部分流量请求进来,相当于对依赖服务进行一次健康检查
如果恢复,熔断器关闭,随后完全恢复调用。

熔断器
步骤4开始,Hystrix会检查Circuit Breaker的状态。
如果Circuit Breaker的状态为开启状态,Hystrix将不会执行对应指令,而是直接进入失败处理状态(图中8 Fallback)。
如果Circuit Breaker的状态为关闭状态,Hystrix会继续进行线程池、任务队列、信号量的检查(图中5)

熔断器 - 参数

1: circuitBreaker.enabled
是否启用熔断器,默认是TURE。
2: circuitBreaker.forceOpen
熔断器强制打开,始终保持打开状态。默认值FLASE。
3: circuitBreaker.forceClosed
熔断器强制关闭,始终保持关闭状态。默认值FLASE。
4: circuitBreaker.errorThresholdPercentage
设定错误百分比,默认值50%,例如一段时间(10s)内有100个请求,其中有55个超时或者异常返回了,那么这段时间内的错误百分比是55%,大于了默认值50%,这种情况下触发熔断器-打开。
5: circuitBreaker.requestVolumeThreshold
默认值20.意思是至少有20个请求才进行errorThresholdPercentage错误百分比计算。比如一段时间(10s)内有19个请求全部失败了。
错误百分比是100%,但熔断器不会打开,因为requestVolumeThreshold的值是20.
这个参数非常重要,熔断器是否打开首先要满足这个条件
6: circuitBreaker.sleepWindowInMilliseconds
半开试探休眠时间,默认值5000ms。当熔断器开启一段时间之后比如5000ms,会尝试放过去一部分流量进行试探,确定依赖服务是否恢复。

熔断模拟示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class GetOrderCircuitBreakerCommand extends HystrixCommand<String> {

public GetOrderCircuitBreakerCommand(String name){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)//默认是true,本例中为了展现该参数
.withCircuitBreakerForceOpen(false)//默认是false,本例中为了展现该参数
.withCircuitBreakerForceClosed(false)//默认是false,本例中为了展现该参数
.withCircuitBreakerErrorThresholdPercentage(5)//(1)错误百分比超过5%
.withCircuitBreakerRequestVolumeThreshold(10)//(2)10s以内调用次数10次,同时满足(1)(2)熔断器打开
.withCircuitBreakerSleepWindowInMilliseconds(5000)//隔5s之后,熔断器会尝试半开(关闭),重新放进来请求
// .withExecutionTimeoutInMilliseconds(1000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withMaxQueueSize(10) //配置队列大小
.withCoreSize(2) // 配置线程池里的线程数
)
);
}

@Override
protected String run() throws Exception {
Random rand = new Random();
//模拟错误百分比(方式比较粗鲁但可以证明问题)
if(1==rand.nextInt(2)){
// System.out.println("make exception");
throw new Exception("make exception");
}
return "running: ";
}

@Override
protected String getFallback() {
// System.out.println("FAILBACK");
return "fallback: ";
}

public static class UnitTest{

@Test
public void testCircuitBreaker() throws Exception{
for(int i=0;i<25;i++){
Thread.sleep(500);
HystrixCommand<String> command = new GetOrderCircuitBreakerCommand("testCircuitBreaker");
String result = command.execute();
//本例子中从第11次,熔断器开始打开
System.out.println("call times:"+(i+1)+" result:"+result +" isCircuitBreakerOpen: "+command.isCircuitBreakerOpen());
//本例子中5s以后,熔断器尝试关闭,放开新的请求进来
}
}
}
}

每个熔断器默认维护10个bucket,每秒一个bucket,每个blucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断拦截。

回退降级



在Hystrix执行非核心链路功能失败的情况下,如何处理,比如返回默认值等。
如果我们要回退或者降级处理,代码上需要实现HystrixCommand.getFallback()方法或者是HystrixObservableCommand. HystrixObservableCommand()。

降级回退方式

  • Fail Fast 快速失败

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Override
    protected String run() {
    if (throwException) {
    throw new RuntimeException("failure from CommandThatFailsFast");
    } else {
    return "success";
    }
    }
    //如果实现的是HystrixObservableCommand.java则 重写 resumeWithFallback方法
    @Override
    protected Observable<String> resumeWithFallback() {
    if (throwException) {
    return Observable.error(new Throwable("failure from CommandThatFailsFast"));
    } else {
    return Observable.just("success");
    }
    }
  • Fail Silent 无声失败
    返回null,空Map,空List

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Override
    protected String getFallback() {
    return null;
    }
    @Override
    protected List<String> getFallback() {
    return Collections.emptyList();
    }
    @Override
    protected Observable<String> resumeWithFallback() {
    return Observable.empty();
    }
  • Fallback: Static 返回默认值

    1
    2
    3
    4
    5
    6
    7
    8
    @Override
    protected Boolean getFallback() {
    return true;
    }
    @Override
    protected Observable<Boolean> resumeWithFallback() {
    return Observable.just( true );
    }
  • Fallback: Stubbed 自己组装一个值返回

    1
    2
    3
    4
    protected UserAccount getFallback() {
    return new UserAccount(customerId, "Unknown Name",
    countryCodeFromGeoLookup, true, true, false);
    }
  • Fallback: Cache via Network 利用远程缓存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    @Override
    protected String getFallback() {
    return new FallbackViaNetwork(id).execute();
    }

    private static class FallbackViaNetwork extends HystrixCommand<String> {
    private final int id;

    public FallbackViaNetwork(int id) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
    this.id = id;
    }

    @Override
    protected String run() {
    MemCacheClient.getValue(id);
    }

    @Override
    protected String getFallback() {
    return null;
    }
    }

Hystrix为我们提供了一套线上系统容错的技术实践方法,我们通过在系统中引入Hystrix的jar包可以很方便的使用线程隔离、熔断、回退等技术。同时它还提供了监控页面配置,方便我们管理查看每个接口的调用情况。
像spring cloud这种微服务构建模式中也引入了Hystrix,我们可以放心使用Hystrix的线程隔离技术,来防止雪崩这种可怕的致命性线上故障。

注解使用

  • 同步执行

    1
    2
    3
    4
    5
    6
    public class UserService {
    @HystrixCommand
    public User getUserById(String id) {
    return userResource.getUserById(id);
    }
    }
  • 异步执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @HystrixCommand
    public Future<User> getUserByIdAsync(final String id) {
    return new AsyncResult<User>() {
    @Override
    public User invoke() {
    return userResource.getUserById(id);
    }
    };
    }

Properties属性介绍

每个方法的相关Properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@HystrixCommand(groupKey = "hello",commandKey = "hello-service",threadPoolKey = "hello-pool",
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),
@HystrixProperty(name = "maxQueueSize", value = "101"),
@HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")
},
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "5000"),
@HystrixProperty(name = "execution.isolation.strategy",value = "THREAD")},
fallbackMethod = "helloFallBack"
)

@DefaultPropertiesClass类上默认的Properties,不需要每个function都定义一次Properties

1
2
3
4
5
6
7
8
9
10
11
@DefaultProperties(groupKey = "DefaultGroupKey")
class Service {
@HystrixCommand // hystrix command group key is 'DefaultGroupKey'
public Object commandInheritsDefaultProperties() {
return null;
}
@HystrixCommand(groupKey = "SpecificGroupKey") // command overrides default group key
public Object commandOverridesGroupKey() {
return null;
}
}

fallback回退

  • fallback方法必须和指定fallback方法的主方法在一个类中
  • fallback方法的参数必须要和主方法的参数一致
    否则不生效
  • 使用fallback方法需要根据依赖服务设置合理的超时时间,即
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    commandProperties = {
    @HystrixProperty(
    name = "execution.isolation.thread.timeoutInMilliseconds",
    value = "5000")}



    @HystrixCommand(fallbackMethod = "fallback1")
    User getUserById(String id) {
    throw new RuntimeException("getUserById command failed");
    }

    @HystrixCommand(fallbackMethod = "fallback2")
    User fallback1(String id, Throwable e) {
    assert "getUserById command failed".equals(e.getMessage());
    throw new RuntimeException("fallback1 failed");
    }

    @HystrixCommand(fallbackMethod = "fallback3")
    User fallback2(String id) {
    throw new RuntimeException("fallback2 failed");
    }

    @HystrixCommand(fallbackMethod = "staticFallback")
    User fallback3(String id, Throwable e) {
    assert "fallback2 failed".equals(e.getMessage());
    throw new RuntimeException("fallback3 failed");
    }

    User staticFallback(String id, Throwable e) {
    assert "fallback3 failed".equals(e.getMessage());
    return new User("def", "def");
    }

    // test
    @Test
    public void test() {
    assertEquals("def", getUserById("1").getName());
    }

缓存功能

  • CacheResult
    @CacheResult方法可以用在我们之前的Service方法上,表示给该方法开启缓存,默认情况下方法的所有参数都将作为缓存的key如下:

    1
    2
    3
    4
    5
    @CacheResult
    @HystrixCommand
    public Book test6(Integer id,String aa) {
    return restTemplate.getForObject("http://HELLO-SERVICE/getbook5/{1}", Book.class, id);
    }
  • CacheKey
    当然除了使用默认数据之外,我们也可以使用@CacheKey来指定缓存的key

    1
    2
    3
    4
    5
    @CacheResult
    @HystrixCommand
    public Book test6(@CacheKey Integer id,String aa) {
    return restTemplate.getForObject("http://HELLO-SERVICE/getbook5/{1}", Book.class, id);
    }
  • CacheRemove
    这个当然是用来让缓存失效的注解,用法也很简单

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @CacheRemove(commandKey = "test6")
    @HystrixCommand
    public Book test7(@CacheKey Integer id) {
    return null;
    }

    @RequestMapping("/test6")
    public Book test6() {
    HystrixRequestContext.initializeContext();
    //第一次发起请求
    Book b1 = bookService.test6(2);
    //清除缓存
    bookService.test7(2);
    //缓存被清除,重新发起请求
    Book b2 = bookService.test6(2);
    //参数一致,使用缓存数据
    Book b3 = bookService.test6(2);
    return b1;
    }

HystrixCollapser 合并命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Service
public class MyService {
Logger logger = LoggerFactory.getLogger(MyService.class);

@HystrixCollapser(batchMethod = "getIds",
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")},
scope = GLOBAL)
public Future<String> getId(String id){
System.out.println("getId");
return null;
}

@HystrixCommand
public List<String> getIds(List<String> ids){
System.out.println("executing on thread id: "+ Thread.currentThread().getName() + ", id:"+ids.toString());
return ids.stream().map(data->String.valueOf(Integer.valueOf(data)+1)).collect(Collectors.toList());
}
}

@Controller
public class CollegeController {
@Autowired
private CollegeService collegeService;

@Autowired
private MyService service;


@RequestMapping("/collapser")
@ResponseBody
public String collapser() throws InterruptedException, ExecutionException {
Future<String> f1 = service.getId("1");
Future<String> f2 = service.getId("2");
String result1 = f1.get();
String result2 = f2.get();

return result1+result2;
}
}

流程

img

toObservable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public Observable<R> toObservable() {
//_cmd 指向当前命令对象,用于下面实现 FuncX ,ActionX 内部类使用。
final AbstractCommand<R> _cmd = this;

final Action0 terminateCommandCleanup = new Action0() {

@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
handleCommandEnd(true); //user code did run
}
}
};

final Action0 unsubscribeCommandCleanup = new Action0() {
@Override
public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(true); //user code did run
}
}
};

//当缓存特性未开启,或者缓存未命中时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明执行命令的 Observable。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};

final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;

try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}

try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};

final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
try {
executionHook.onSuccess(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
}
};

//声明缓存 Observable 。Hystrix 执行命令的 Observable 声明
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//一条命令只能执行一次
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}

//记录命令开始时间戳
commandStartTimestamp = System.currentTimeMillis();

if (properties.requestLogEnabled().get()) {
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}

//缓存存开关、KEY 。
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();

//如果请求结果缓存这个特性被启用,并且缓存命中,则缓存的回应会立即通过一个 Observable 对象的形式返回。
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

// 获得 执行命令Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
// 获得 缓存Observable
Observable<R> afterCache;


if (requestCacheEnabled && cacheKey != null) {
//当缓存特性开启,并且缓存未命中时,创建订阅了执行命令的 Observable的 HystrixCommandResponseFromCache
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
toCache.unsubscribe();
isResponseFromCache = true; // 标记 从缓存中结果
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {

afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}

return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
});
}

applyHystrixSemantics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};


private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

executionHook.onStart(_cmd);


if (circuitBreaker.allowRequest()) {
// 获得 信号量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();

// 信号量释放Action
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};

final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};

// 信号量 获得
if (executionSemaphore.tryAcquire()) {
try {
// 标记 executionResult 调用开始时间
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

// 获得 执行Observable
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
//链路处于熔断状态,调用 #handleShortCircuitViaFallback() 方法,处理链路熔断的失败回退逻辑
return handleShortCircuitViaFallback();
}
}

executeCommandAndObserve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};

final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};


// 失败回退逻辑 Func1
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}

return handleFailureViaFallback(e);
}
}
};

// 请求缓存
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};

Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}

return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

executeCommandWithSpecifiedIsolation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
//执行隔离策略为 Thread


return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
}
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
}
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
//执行隔离策略为 SEMAPHORE

return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);


endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}