EZLippi-浮生志

使用FilterChain模式监控SQL调用

做过Web开发应该对FilterChain这个词不陌生,在Servlet应用中我们可以使用Filter来拦截拦截客户端请求,在响应返回给客户端之前截获请求,我们可以使用EncodingFilter来设置请求的编码,使用SecurityFilter来做权限控制,所有的Filter组成了一个完整的FilterChain. NIO框架Netty也是用了FilterChain的设计模式,可以配置Filter来完成请求的编解码,解决TCP的粘包和拆包问题等等…

这篇文章就介绍一下如何利用FilterChain模式来监控SQL的调用性能.

Filter定义

首先定义一个JdbcFilter,用于监听SQL相关操作,接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface JdbcFilter{
void initDataSource(DataSource source, JdbcFilter chain);

void closeConnection(Connection source, JdbcFilter chain) throws SQLException;

<T> T executeSingleStatement(SingleStatement source, SingleConnection conn, String sql, boolean autoCommit, Object params, JdbcFilter chain) throws SQLException;
}


public class DefaultJdbcFilterChain implements JdbcFilter {
protected final List<JdbcFilter> filters;

protected int index = 0;

public DefaultJdbcFilterChain(List<JdbcFilter> filters) {
this.filters = filters;
}
}

DataSource定义

我们在自定义的DataSource类里添加一个Filter列表,Filter可以从配置中获取,DataSource初始化时初始化一个FilterChain,依次调用每个Filter的initSingleDataSource()方法,最后调用真正的初始化方法initDataSourceOrigin,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SingleDataSource extends AbstractDataSource{
protected volatile List<JdbcFilter> filters;

@Override
public synchronized void init() {
this.filters = loadFilters()
initDataSourceWithFilters(this.config);
}

private DataSource initDataSourceWithFilters(final DataSourceConfig value) {
if (filters != null && filters.size() > 0) {
JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
@Override
public DataSource initSingleDataSource(SingleDataSource source, JdbcFilter chain) {
if (index < filters.size()) {
//依次调用每个Filter的方法
return filters.get(index++).initSingleDataSource(source, chain);
} else {
return source.initDataSourceOrigin(value);
}
}
};
return chain.initSingleDataSource(this, chain);
} else {
return initDataSourceOrigin(value);
}
}

}

Connection定义

如果我们要监听Connection的关闭事件,那就创建一个自定义的SingleConnection类,里面包含Filter列表,重写close()方法,在里面初始化FilterChain,依次调用Filter的closeSingleConnection()方法,最后再调用原始的close()方法,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SingleConnection implements Connection {
private final Connection conn;

private final SingleDataSource dataSource;

private final List<JdbcFilter> filters;

private void closeOrigin() throws SQLException {
conn.close();
}

@Override
public void close() throws SQLException {
if (filters != null && filters.size() > 0) {
JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
@Override
public void closeSingleConnection(SingleConnection source, JdbcFilter chain) throws SQLException {
if (index < filters.size()) {
//依次调用每个Filter的方法
filters.get(index++).closeSingleConnection(source, chain);
} else {
source.closeOrigin();
}
}
};
chain.closeSingleConnection(this, chain);
} else {
closeOrigin();
}
}

@Override
public Statement createStatement() throws SQLException {
Statement stmt = conn.createStatement();
return new SingleStatement(this.dsId, this, stmt, this.filters);
}
}

Statement定义

类似的,如果要监听Statement的执行事件,那就可以新建一个SingleStatement类,对原生的Statement做一层封装,在相应的SQL执行方法里调用FilterChain的方法,SingleConnection在初始化Statement时返回一个SingleStatement实例,并把filters引用传递给SingleStatement.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SingleStatement implements Statement {

protected final SingleConnection singleConnection;

protected final List<JdbcFilter> filters;

protected final Statement innerStatement;

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
final String processedSql = processSQL(sql, false);
return executeWithFilter(new JdbcOperationCallback<ResultSet>() {
@Override
public ResultSet doAction(Connection conn) throws SQLException {
ResultSet rs = innerStatement.executeQuery(processedSql);
return new SingleResultSet(rs, filters);
}
}, processedSql, null, false);
}

protected <T> T executeWithFilter(final JdbcOperationCallback<T> callback, final String sql, Object params) throws SQLException {
if (filters != null && filters.size() > 0) {
JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
@Override
public <T> T executeSingleStatement(SingleStatement source, SingleConnection conn, String sql, boolean autoCommit, Object params, JdbcFilter chain) throws SQLException {
if (index < filters.size()) {
//依次调用filter的方法
return filters.get(index++).executeSingleStatement(source, conn, sql, autoCommit, params, chain);
} else {
return (T) executeWithFilterOrigin(callback, conn);
}
}
};

return chain.executeSingleStatement(this, this.singleConnection, sql, this.singleConnection.getAutoCommit(), params, chain);
} else {
return executeWithFilterOrigin(callback, this.singleConnection);
}
}

}

获取SQL执行结果

最后我们在Filter里添加生命周期方法,比如获取Statement的执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
public class RealFilter extends DefaultJdbcFilter {
public <T> T executeSingleStatement(SingleStatement source, SingleConnection conn, String sql, boolean autoCommit, Object sqlParams, JdbcFilter chain) throws SQLException {

Object result = null;
try {
result = chain.executeSingleStatement(source, conn, sql, autoCommit, sqlParams, chain);
} catch (SQLException e) {
throw e;
}
return result;
}
}

前面的例子简单介绍了如何使用FilterChain模式来做SQL的监控,完整的源码可以参考zebra.

🐶 您的支持将鼓励我继续创作 🐶