SkyWalking 相关的源码详解笔记

分析

优点

  • 探针技术,无需开发任何介入,只需告知SpringCloud服务模块名称,即可在部署的时候加入链路追踪

  • 探针插件系统强大,基本符合开发常用的所有技术栈链路监控,也可自定义相关的链路追踪业务插件

  • 探针性能从网络Jemeter测试,skywalking的探针对吞吐量的影响最小,zipkin的吞吐量居中,pinpoint的探针对吞吐量的影响较为明显

  • 所有错误(超时.异常)全都会以log event记录在案,并以Alarm界面展现

  • collector服务端代码模块很好,强大的Module/Provider结构模块自定义扩展

  • collector后端数据流采用了Streaming,类似简约Storm结构处理

  • 支持业务工具toolkit,自己标注tag

    1
    2
    3
    4
    5
    6
    7
    @Trace//@Trace 注解的方法,会创建一个LocalSpan   
    @GetMapping("/log")
    public String log() {
    ActiveSpan.tag("mp", "test");
    System.out.println("traceId:" + TraceContext.traceId());
    return "log";
    }

缺点

  • 部署的时候,Host问题

  • WebUI trace排序缺少倒叙正序

  • WebUI log只有官方定义Exception,日志查询需要后期与通过traceId关联ELK查询,也可以通过ActiveSpan.tag进行新增tag

  • 异常报警,虽然有界面,但是缺少短信和邮件相关功能支持

由于该技术用了JavaAgent,探针技术所以先传阐述一下探针技术

JavaAgent

jdk1.5以后引入了javaAgent技术,javaAgent是运行方法之前的拦截器。

我们利用javaAgent和ASM字节码技术,在JVM加载class二进制文件的时候.

利用ASM动态的修改加载的class文件,在监控的方法前后添加计时器功能.

用于计算监控方法耗时

PreMain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package agent;
import java.lang.instrument.Instrumentation;

public class pre_MyProgram {
/**
* 该方法在main方法之前运行,与main方法运行在同一个JVM中
* 并被同一个System ClassLoader装载
* 被统一的安全策略(security policy)和上下文(context)管理
*
* @param agentOps
* @param inst
* @author SHANHY
* @create 2016年3月30日
*/
public static void premain(String agentOps,Instrumentation inst){

System.out.println("====premain 方法执行");
System.out.println(agentOps);

// 添加Transformer
inst.addTransformer(new MyTransformer());
}

/**
* 如果不存在 premain(String agentOps, Instrumentation inst)
* 则会执行 premain(String agentOps)
*
* @param agentOps
* @author SHANHY
* @create 2016年3月30日
*/
public static void premain(String agentOps){

System.out.println("====premain方法执行2====");
System.out.println(agentOps);
}
public static void main(String[] args) {
// TODO Auto-generated method stub

}
}

Transformer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class MyTransformer implements ClassFileTransformer {

final static String prefix = "\nlong startTime = System.currentTimeMillis();\n";
final static String postfix = "\nlong endTime = System.currentTimeMillis();\n";

// 被处理的方法列表
final static Map<String, List<String>> methodMap = new HashMap<String, List<String>>();

public MyTransformer() {
add("com.shanhy.demo.TimeTest.sayHello");
add("com.shanhy.demo.TimeTest.sayHello2");
}

private void add(String methodString) {
String className = methodString.substring(0, methodString.lastIndexOf("."));
String methodName = methodString.substring(methodString.lastIndexOf(".") + 1);
List<String> list = methodMap.get(className);
if (list == null) {
list = new ArrayList<String>();
methodMap.put(className, list);
}
list.add(methodName);
}

@Override
public byte[] transform(ClassLoader loader, String className, Class<?> classBeingRedefined,
ProtectionDomain protectionDomain, byte[] classfileBuffer) throws IllegalClassFormatException {
className = className.replace("/", ".");
if (methodMap.containsKey(className)) {// 判断加载的class的包路径是不是需要监控的类
CtClass ctclass = null;
try {
ctclass = ClassPool.getDefault().get(className);// 使用全称,用于取得字节码类<使用javassist>
for (String methodName : methodMap.get(className)) {
String outputStr = "\nSystem.out.println(\"this method " + methodName
+ " cost:\" +(endTime - startTime) +\"ms.\");";

CtMethod ctmethod = ctclass.getDeclaredMethod(methodName);// 得到这方法实例
String newMethodName = methodName + "$old";// 新定义一个方法叫做比如sayHello$old
ctmethod.setName(newMethodName);// 将原来的方法名字修改

// 创建新的方法,复制原来的方法,名字为原来的名字
CtMethod newMethod = CtNewMethod.copy(ctmethod, methodName, ctclass, null);

// 构建新的方法体
StringBuilder bodyStr = new StringBuilder();
bodyStr.append("{");
bodyStr.append(prefix);
bodyStr.append(newMethodName + "($$);\n");// 调用原有代码,类似于method();($$)表示所有的参数
bodyStr.append(postfix);
bodyStr.append(outputStr);
bodyStr.append("}");

newMethod.setBody(bodyStr.toString());// 替换新方法
ctclass.addMethod(newMethod);// 增加新方法
}
return ctclass.toBytecode();
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}
return null;
}
}

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TimeTest {

public static void main(String[] args) {
sayHello();
sayHello2("hello world222222222");
}

public static void sayHello() {
try {
Thread.sleep(2000);
System.out.println("hello world!!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void sayHello2(String hello) {
try {
Thread.sleep(1000);
System.out.println(hello);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

====premain 方法执行
Hello1
====premain 方法执行
Hello2

hello world!!
this method sayHello cost:2000ms.
hello world222222222
this method sayHello2 cost:1000ms.




Client

客户端起点在于配置初始化
org.skywalking.apm.agent.core.conf.SnifferConfigInitializer,Agent 配置初始化器。

配置类有 Config 和 RemoteDownstreamConfig 两种。

  • Config 为 Agent 本地配置类,使用 SnifferConfigInitializer 进行初始化。
  • RemoteDownstreamConfig 为 Agent 远程配置类,从 Collector Server 读取。
1
2
3
4
5
6
7
8
SnifferConfigInitializer#initialize

在loadConfig()处开始从从{agent目录}/config/agent.config

后overrideConfigBySystemProp()处又开始读取环境变量覆盖配置

agent.config 为 agent.application_code
则环境变量为 skywalking.agent.application_code

Config

Agent 本地配置类
此类内部主要有Agent / Collector / Jvm / Buffer / Dictionary / Logging / Plugin 七个小类

RemoteDownstreamConfig

Agent 远程配置类。
只有Agent小类

Plugin

SkyWalking Agent 提供了多种插件,实现不同框架的透明接入SkyWalking。

apm-sniffer/apm-sdk-plugin目录下,有插件的实现代码

PluginBootstrap

插件引导程序类,创建需要加载的插件对象数组。
org.skywalking.apm.agent.core.plugin.PluginBootstrap

通过loadPlugins()作如下事项

  • 初始化AgentClassLoader
  • 获得插件定义路径数组
  • 获得插件定义PluginDefine
  • 创建类增强插件定义AbstractClassEnhancePluginDefine,并提供define形成builder

    1
    不同插件定义不同的切面,记录调用链路

PluginFinder

插件发现者

提供find(typeDescription)查找相关的类增强插件AbstractClassEnhancePluginDefine

  • find()查找AbstractClassEnhancePluginDefine对象
    • 处理NameMatch为匹配的AbstractClassEnhancePluginDefine对象,添加到nameMatchDefine属性。
    • 处理非NameMatch为匹配的AbstractClassEnhancePluginDefine对象,添加到signatureMatchDefine属性。

ServiceManager

BootService管理器,负责管理和初始化BootService实例们
org.skywalking.apm.agent.core.boot.ServiceManager

  • loadAllServices()方法,加载所有BootService实例类的实例数组
    ServiceManager 基于SPI (Service Provider Interface)机制
    /resources/META-INF.services/org.skywalking.apm.agent.core.boot.BootService文件里
    定义了所有BootService的实现类。

  • beforeBoot()调用每个BootService#beforeBoot()方法

  • startup(),调用每个BootService#boot()方法
  • afterBoot()调用每个BootService#afterBoot()方法

BootService

1
2
3
4
5
6
7
org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService

定义以上七个,分别有beforeBoot() / boot() / afterBoot() / shutdown() 接口方法。


Plugin

插件的加载

Agent 初始化时,调用PluginBootstrap#loadPlugins()方法,加载所有的插件。

AgentClassLoader

应用透明SkyWalking,不会显式导入SkyWalking依赖,所以自定义ClassLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static AgentClassLoader initDefaultLoader() throws AgentPackageNotFoundException {
DEFAULT_LOADER = new AgentClassLoader(PluginBootstrap.class.getClassLoader());
return getDefault();
}


public class AgentClassLoader extends ClassLoader {

/**
* The default class loader for the agent.
*/
private static AgentClassLoader DEFAULT_LOADER;

/**
* classpath
*/
private List<File> classpath;
/**
* Jar 数组
*/
private List<Jar> allJars;
/**
* Jar 读取时的锁
*/
private ReentrantLock jarScanLock = new ReentrantLock();

public AgentClassLoader(ClassLoader parent) throws AgentPackageNotFoundException {
super(parent);
File agentDictionary = AgentPackagePath.getPath();
classpath = new LinkedList<File>();
classpath.add(new File(agentDictionary, "plugins"));
classpath.add(new File(agentDictionary, "activations"));
}
}

在 ClassLoader 加载资源(例如,类),会调用如下

  • #findResource(name)
  • #findResources(name)

PluginResourcesResolver

插件资源解析器,读取所有插件(/resource/skywalking-plugin.def)定义文件

#getResources()方法,获得插件定义路径数组

使用了AgentClassLoader获得所有skywalking-plugin.def的路径。

PluginCfg

插件定义配置,读取玩skywalking-plugin.def
生成插件定义(org.skywalking.apm.agent.core.plugin.PluginDefinie)数组。

AbstractClassEnhancePluginDefine

类增强插件定义抽象基类。

不同插件通过实现AbstractClassEnhancePluginDefine抽象类

定义不同框架的切面记录调用链路


Server

Collector 组件

OAPServerStartUp

skyWalking Collector 启动入口

  • 调用ApplicationConfiguration#load()方法,加载Collector配置。
  • 调用ModuleManager#init(...)方法,初始化Collector组件们。

ApplicationConfigLoader

绿框部分,对应一个组件配置类。
红框部分,对应一个组件服务提供者配置类。

理一下关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14

Collectior使用组件管理器(`ModuleManager`)管理多个组件(`Module`)
一个组件有多种组件服务提供者(`MudoleProvider`)
同时一个组件只允许使用一个组件服务提供者

Collector使用一个应用配置类(`ApplicationConfiguration`)
一个应用配置类包含多个组件配置类(`ModuleConfiguration`)
每个组件对应一个组件配置类

一个组件配置类包含多个组件服务提供者配置(`ProviderConfiguration`)。
每个组件服务提供者对应一个组件配置类。

注意:因为一个组件只允许同时使用一个组件服务提供者,
所以一个组件配置类只设置一个组件服务提供者配置。

调用 #loadConfig() 方法

从 apm-collector-core 的 application.yml 加载自定义配置。

overrideConfigBySystemEnv

从系统环境覆盖配置

ModuleManager

ModuleManager 组件管理器,负责组件的管理与初始化。

init() 初始化组件

ServiceLoader.load(ModuleDefine.class);j加载所有Module实现类数组

ServiceManager 基于SPI (Service Provider Interface)机制

在每个serverPlugin项目的/resources/META-INF.services/org.skywalking.apm.collector.core.module.Module文件里

定义了该项目 Module 的实现类。

遍历Module实现的实例数组,创建配置中的Module实现类的实例

  • 创建Module对象
  • 调用prepare(...)方法,执行Module准备阶段的逻辑
    • 方法内部寻找ServiceLoader.load(ModuleProvider.class);
    • 实例化然后ModuleProvider添加到loadedProviders
  • BootstrapFlow#start(...)方法,执行Module启动逻辑
  • BootstrapFlow#notifyAfterCompleted()方法,执行Module启动完成,通知ModuleProvider

Collector处理Trace数据

Collector Streaming 流式处理

graph包的基础上:提供异步、跨节点等等的流式处理的封装

1
2
3
在 Storm 中,先要设计一个用于实时计算的图状结构,我们称之为拓扑(topology)。
这个拓扑将会被提交给集群,由集群中的主控节点(master node)分发代码
将任务分配给工作节点(worker node)执行。
  • Graph :定义了一个数据在各个 Node 的处理拓扑图。
  • WayToNode :提交数据给 Node 的方式。
  • Node :节点,包含一个 NodeProcessor 和 一个 Next 。
    • NodeProcessor :Node 处理器,处理数据。
    • Next :包含 WayToNode 数组,即 Node 提交数据给 Next 的 Node 数组的方式。

Graph

Graph创建

创建Graph的顺序图如下

  • 第一步GraphManager#createIfAbsent(graphId, input)方法,创建一个Graph对象,并添加到 Graph 映射

  • 第二步Graph#addNode(WayToNode)方法,创建该Graph的首个Node对象。

  • 第三步Node#addNext(WayToNode)方法,创建该Node的下一个Node对象。

Graph 启动

数据流向 FROM TO 逻辑
第一步 Graph WayToNode
第二步 WayToNode Node
第三步 Node NodeProcessor
第四步 NodeProcessor Next 根据具体实现,若到 Next ,重复第一步

WayToNode

graph包的基础上,提供异步、跨节点等等的流式处理的封装。主要在 WayToNode 、NodeProcessor 的实现类上做文章。

新版本的变动

各类Provider -> 各类ServerHandler -> 各类Handler预处理(JVMSourceDispatcher) -> DispatcherManager

DispatcherManager(根据scope选取dispatcher处理) -> AbstractWorker(真正的Work) -> DataCarrier(异步处理库)

1
IndicatorRemoteWorker直接remoteClient实例Push外网其他节点工作构建  并保存到异步处理库

数据消费处理

DataCarrier(异步处理库) -> consume(consumerPool#begin()) -> consumerThread.start() -> dataSource.obtain();

dataSource.obtain();(获得数据,开始消费) -> RemoteMessageConsumer/PersistentConsumer引出各类真正的Consumer -> 执行操作

Node转换为Worker,并且Worker以各类形态示人

芋道代码详解
代码来源