在我们日常使用Redis实现分布式锁中,依赖如下
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.1</version>
</dependency>
在使用Redisson作为客户端,它需要与服务端进行通信,那么它的底层通信使用的是Netty.
在启动Redisson客户端时,底层Netty就已经与服务端建立好了通信(通道Channel).
简单写了一个示例代码
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
public class RedisClient {
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 单机模式
RedissonClient redissonClient = Redisson.create(config);
RLock redLock = redissonClient.getLock("computerLock");// 获取锁实例
try {
boolean isLock = redLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);
if (isLock) {
System.out.println("获取到锁,执行业务逻辑");
}
} catch (Exception x) {
} finally {
redLock.unlock();
System.out.println("释放锁");
}
}
}
以下代码摘录在源码,有部分无关紧要的代码删减
以上代码中,在执行
RedissonClient redissonClient = Redisson.create(config);
时候,就会创建Netty客户端,并与服务端建立好通信.
public static RedissonClient create(Config config) {
// #2 创建Redisson
Redisson redisson = new Redisson(config);
return redisson;
}
// 实例化Redisson
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
// #3
connectionManager = ConfigSupport.createConnectionManager(configCopy);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
}
public static ConnectionManager createConnectionManager(Config configCopy) {
UUID id = UUID.randomUUID();
if (configCopy.getMasterSlaveServersConfig() != null) {
return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
} else if (configCopy.getSingleServerConfig() != null) {
// #4 单机模式
return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
} else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
}
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
// #5
super(create(cfg), config, id);
}
super调用父类MasterSlaveConnectionManager构造器
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
this.config = cfg;
initTimer(cfg);
// #6 内部创建Netty客户端
initSingleEntry();
}
protected void initSingleEntry() {
try {
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(this, config);
} else {
entry = createMasterSlaveEntry(config);
}
// #7
RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} catch (RuntimeException e) {
stopThreads();
throw e;
}
}
public RFuture<RedisClient> setupMasterEntry(URI address) {
// #8 创建RedisClient
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
// #9
return setupMasterEntry(client);
}
在#8处会创建RedisClient,通过名字可以猜到,它是一个客户端对象,在它的内部有一个用于连接服务端的Netty的Bootstrap对象
private RedisClient(RedisClientConfig config) {
RedisClientConfig copy = new RedisClientConfig(config);
channels = new DefaultChannelGroup(copy.getGroup().next());
// 创建Bootstrap
bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
this.commandTimeout = copy.getCommandTimeout();
}
在#9处便会通过Bootstrap对象连接服务端了
private RFuture<RedisClient> setupMasterEntry(final RedisClient client) {
final RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
// #10 内部会连接服务端
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
}
}
});
return result;
}
public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> promise = new RedissonPromise<Void>();
promise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (future.isSuccess()) {
entries.add(entry);
}
}
});
// #11 内部会连接服务端
initConnections(entry, promise, true);
return promise;
}
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {
final int minimumIdleSize = getMinimumIdleSize(entry);
final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
int startAmount = Math.min(50, minimumIdleSize);
final AtomicInteger requests = new AtomicInteger(startAmount);
for (int i = 0; i < startAmount; i++) {
// #12 创建连接
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
}
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,
final int minimumIdleSize, final AtomicInteger initializedConnections) {
// #13 获取连接
acquireConnection(entry, new Runnable() {
@Override
public void run() {
RPromise<T> promise = new RedissonPromise<T>();
// #14 创建连接
createConnection(entry, promise);
promise.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
}
});
}
});
}
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
// #15 连接
RFuture<T> connFuture = connect(entry);
}
protected RFuture<T> connect(ClientConnectionsEntry entry) {
return (RFuture<T>) entry.connect();
}
public RFuture<RedisConnection> connect() {
// #16 这里的client就是之前创建的RedisClient
RFuture<RedisConnection> future = client.connectAsync();
return future;
}
public RFuture<RedisConnection> connectAsync() {
final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
RFuture<InetSocketAddress> addrFuture = resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
f.tryFailure(future.cause());
return;
}
// #17 连接服务端
ChannelFuture channelFuture = bootstrap.connect(future.getNow());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
}
});
}
});
return f;
}
在#17处通过之前创建的Bootstrap对象,调用connect方法连接服务端,至此客户端就与服务端建立了连接,之后需要发送给服务端的命令,都通过这个建立好的连接发送出去.
最后系统中会多出许多'redisson-netty-1-x'命名的线程.它们都是已经和服务端建立好了连接,随时都可以进行通信.