当前位置: 首页>后端>正文

Netty在Redis客户端中的应用

在我们日常使用Redis实现分布式锁中,依赖如下

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.1</version>
</dependency>

在使用Redisson作为客户端,它需要与服务端进行通信,那么它的底层通信使用的是Netty.

在启动Redisson客户端时,底层Netty就已经与服务端建立好了通信(通道Channel).

简单写了一个示例代码

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedisClient {

    public static void main(String[] args) {

        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");

        // 单机模式
        RedissonClient redissonClient = Redisson.create(config);
        RLock redLock = redissonClient.getLock("computerLock");// 获取锁实例

        try {
            boolean isLock = redLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);

            if (isLock) {
                System.out.println("获取到锁,执行业务逻辑");
            }
        } catch (Exception x) {
        } finally {
            redLock.unlock();
            System.out.println("释放锁");
        }
    }
}

以下代码摘录在源码,有部分无关紧要的代码删减

以上代码中,在执行

RedissonClient redissonClient = Redisson.create(config);

时候,就会创建Netty客户端,并与服务端建立好通信.

public static RedissonClient create(Config config) {
    // #2 创建Redisson
    Redisson redisson = new Redisson(config);
    return redisson;
}
// 实例化Redisson
protected Redisson(Config config) {
    this.config = config;
    Config configCopy = new Config(config);
    // #3
    connectionManager = ConfigSupport.createConnectionManager(configCopy);
    evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
}
public static ConnectionManager createConnectionManager(Config configCopy) {
    UUID id = UUID.randomUUID();

    if (configCopy.getMasterSlaveServersConfig() != null) {
        return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
    } else if (configCopy.getSingleServerConfig() != null) {
        // #4 单机模式
        return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
    } else {
        throw new IllegalArgumentException("server(s) address(es) not defined!");
    }
}
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
    // #5
    super(create(cfg), config, id);
}

super调用父类MasterSlaveConnectionManager构造器

public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
    this(config, id);
    this.config = cfg;

    initTimer(cfg);
    // #6 内部创建Netty客户端
    initSingleEntry();
}
protected void initSingleEntry() {
   try {
       MasterSlaveEntry entry;
       if (config.checkSkipSlavesInit()) {
           entry = new SingleEntry(this, config);
       } else {
           entry = createMasterSlaveEntry(config);
       }
       // #7
       RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
       f.syncUninterruptibly();

   } catch (RuntimeException e) {
       stopThreads();
       throw e;
   }
}
public RFuture<RedisClient> setupMasterEntry(URI address) {
    // #8 创建RedisClient
    RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
    // #9
    return setupMasterEntry(client);
}

在#8处会创建RedisClient,通过名字可以猜到,它是一个客户端对象,在它的内部有一个用于连接服务端的Netty的Bootstrap对象

private RedisClient(RedisClientConfig config) {
    RedisClientConfig copy = new RedisClientConfig(config);
    channels = new DefaultChannelGroup(copy.getGroup().next());
    // 创建Bootstrap
    bootstrap = createBootstrap(copy, Type.PLAIN);
    pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);

    this.commandTimeout = copy.getCommandTimeout();
}

在#9处便会通过Bootstrap对象连接服务端了

private RFuture<RedisClient> setupMasterEntry(final RedisClient client) {
    final RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
    RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
    addrFuture.addListener(new FutureListener<InetSocketAddress>() {

        @Override
        public void operationComplete(Future<InetSocketAddress> future) throws Exception {
            masterEntry = new ClientConnectionsEntry(
                client, 
                config.getMasterConnectionMinimumIdleSize(), 
                config.getMasterConnectionPoolSize(),
                config.getSubscriptionConnectionMinimumIdleSize(),
                config.getSubscriptionConnectionPoolSize(), 
                connectionManager, 
                NodeType.MASTER);

            // #10 内部会连接服务端
            RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);

            if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
                RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
            }
        }
    });

    return result;
}
public RFuture<Void> add(final ClientConnectionsEntry entry) {
    final RPromise<Void> promise = new RedissonPromise<Void>();
    promise.addListener(new FutureListener<Void>() {
        @Override
        public void operationComplete(Future<Void> future) throws Exception {
            if (future.isSuccess()) {
                entries.add(entry);
            }
        }
    });
    // #11 内部会连接服务端
    initConnections(entry, promise, true);
    return promise;
}
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {
    final int minimumIdleSize = getMinimumIdleSize(entry);
    final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
    int startAmount = Math.min(50, minimumIdleSize);
    final AtomicInteger requests = new AtomicInteger(startAmount);
    
    for (int i = 0; i < startAmount; i++) {
        // #12 创建连接
        createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
    }
}
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,
            final int minimumIdleSize, final AtomicInteger initializedConnections) {

    // #13 获取连接
    acquireConnection(entry, new Runnable() {
        @Override
        public void run() {
            RPromise<T> promise = new RedissonPromise<T>();
            // #14 创建连接
            createConnection(entry, promise);
            promise.addListener(new FutureListener<T>() {
                @Override
                public void operationComplete(Future<T> future) throws Exception {

                }
            });
        }
    });

}
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
    // #15 连接
    RFuture<T> connFuture = connect(entry);
}
protected RFuture<T> connect(ClientConnectionsEntry entry) {
    return (RFuture<T>) entry.connect();
}
public RFuture<RedisConnection> connect() {
    // #16 这里的client就是之前创建的RedisClient
    RFuture<RedisConnection> future = client.connectAsync();
    
    return future;
}
public RFuture<RedisConnection> connectAsync() {
    final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();

    RFuture<InetSocketAddress> addrFuture = resolveAddr();
    addrFuture.addListener(new FutureListener<InetSocketAddress>() {
        @Override
        public void operationComplete(Future<InetSocketAddress> future) throws Exception {
            if (!future.isSuccess()) {
                f.tryFailure(future.cause());
                return;
            }

            // #17 连接服务端
            ChannelFuture channelFuture = bootstrap.connect(future.getNow());
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(final ChannelFuture future) throws Exception {
                    
                }
            });
        }
    });

    return f;
}

在#17处通过之前创建的Bootstrap对象,调用connect方法连接服务端,至此客户端就与服务端建立了连接,之后需要发送给服务端的命令,都通过这个建立好的连接发送出去.

Netty在Redis客户端中的应用,第1张
20200910063106.png

最后系统中会多出许多'redisson-netty-1-x'命名的线程.它们都是已经和服务端建立好了连接,随时都可以进行通信.


https://www.xamrdz.com/backend/3j21940381.html

相关文章: