当前位置: 首页>后端>正文

jedis实现分布式锁

通过此篇文章可以了解Redis的底层通信,Redis的协议,以及自己手写与服务器通信.

在分布式锁的实现上, 基于Redis的实现是其中一种.

而具体的实现依赖包又有两个

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.0.1</version>
</dependency>

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.10.1</version>
</dependency>

本篇文章我们就讲解第一种Jedis.

基于Jedis又有两种实现

// 单机
Jedis(String host, int port, int connectionTimeout, int soTimeout)

// 集群
JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig)

不管是单机还是集群,它们的底层和服务器之间的通信都是基于java.net.Socket

比如说我们通过Jedis(String host, int port, int connectionTimeout, int soTimeout)构造了一个客户端,连接单机的服务器.

public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout) {
    // 调用父类BinaryJedis
    super(host, port, connectionTimeout, soTimeout);
}
public BinaryJedis(final String host, final int port, final int connectionTimeout,
      final int soTimeout) {
    // 创建一个Client,它并没有连接服务器,只是先保存了host,port. 在Client类内部有个Socket属性.
    client = new Client(host, port);
    client.setConnectionTimeout(connectionTimeout);
    client.setSoTimeout(soTimeout);
}

调用首次调用setnx向服务器发送命令时,会连接服务器

public Long setnx(final String key, final String value) {
    checkIsInMultiOrPipeline();
    // 调用上面构造好的client的setnx方法
    client.setnx(key, value);
    return client.getIntegerReply();
  }

最后会跟进到如下代码

protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
        // 连接服务器
        connect();
        // 发送指令
        Protocol.sendCommand(outputStream, cmd, args);
        pipelinedCommands++;
        return this;
    } catch (JedisConnectionException ex) {
        // ...
    }
}

跟进到connect()方法

public void connect() {
    // 判断socket是否已连接,如果没有连接服务器则连接服务器
    if (!isConnected()) {
        try {
            socket = new Socket();
            socket.setReuseAddress(true);
            socket.setKeepAlive(true);
            socket.setTcpNoDelay(true);
            socket.setSoLinger(true, 0);
            socket.connect(new InetSocketAddress(host, port), connectionTimeout);
            socket.setSoTimeout(soTimeout);
            // 输出流
            outputStream = new RedisOutputStream(socket.getOutputStream());
            // 输入流
            inputStream = new RedisInputStream(socket.getInputStream());
        } catch (IOException ex) {
            broken = true;
            throw new JedisConnectionException(ex);
        }
    }
}

这就是最熟悉的通过Socket和服务器进行通信,还有输入输出流.

连接好服务器之后,接下来就是发送命令给服务器了.

跟进到发送命令的代码

private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    // 通过输出流向服务器发送数据
    try {
        os.write(ASTERISK_BYTE);
        os.writeIntCrLf(args.length + 1);
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(command.length);
        os.write(command);
        os.writeCrLf();

        for (final byte[] arg : args) {
            os.write(DOLLAR_BYTE);
            os.writeIntCrLf(arg.length);
            os.write(arg);
            os.writeCrLf();
        }
    } catch (IOException e) {
        throw new JedisConnectionException(e);
    }
}

通过输出流,将命令发送给服务器.

下面我们将这个方法里面的常量替换一下,把一些不必要的代码删除,再看下这个方法

// #1
os.write('*');
os.write(args.length + 1);
os.write("\r\n");

// #2
os.write('$');
os.write(command.length);
os.write("\r\n");
os.write(command);
os.write("\r\n");

// #3
for (final byte[] arg : args) {
    os.write('$');
    os.write(arg.length);
    os.write("\r\n");
    os.write(arg);
    os.write("\r\n");
}

在说这段代码之前,我们要说下Redis的协议.

互联网的通信是基于协议的,我们熟悉的TCP/IP协议,Dubbo通信的dubbo协议,Zookeeper的zookeeper协议,RocketMQ通信的自身应用层协议.没有协议,那么客户端和服务器就不能通信,彼此'听不懂'对方在说什么.

那么Redis客户端和服务器之间要想彼此知道对方说的什么,那么它们之间也有通过协议通信,这就是Redis协议.

具体协议如下

*<number of arguments> CR LF
$<number of bytes of argument 1> CR LF
<argument data> CR LF
...
$<number of bytes of argument N> CR LF
<argument data> CR LF

比如我们要向服务器发送SET mykey myvalue这个命令,那么转换成协议之后,具体的内容如下

*3

SET

mykey

myvalue

并不是我不把它们写成一行,而是在它们彼此之间有'\r\n',也就是回车换行. 我们简单介绍下它

*3

*这个符号是固定的,那么后面这个3是什么意思呢,3表示后面有3个内容(或者说命令由3部分组成),细心的读者也看到了,整个命令中有3个jedis实现分布式锁,符号,每个,第1张符号代表一个内容.


SET

$符号是固定的,后面的3表示后面有3个字符,因为SET就是3个字符组成的


mykey

$符号是固定的,后面的5表示后面有5个字符,因为mykey就是5个字符组成的


myvalue

$符号是固定的,后面的7表示后面有7个字符,因为myvalue就是7个字符组成的

协议的编解码也是一个话题. Redis的协议是基于长度的,通过长度就可以准确的知道,命令的开始在哪里,结束又在哪里. 基于长度的协议有很多,比如Dubbo或者RocketMQ的协议,它们将数据发送给对方之后,对方就是通过基于长度的解码器,将数据解码出来.

相信读者朋友应该明白了Redis的协议.那么我们只要通过Socket的输出流将这些协议内容发送给服务器就可以了,服务器基于协议,就能'读懂'我们发送给它的命令是什么了.

我们再回到上面的那段代码.

// #1
os.write('*');
os.write(args.length + 1);
os.write("\r\n");

// #2
os.write('$');
os.write(command.length);
os.write("\r\n");
os.write(command);
os.write("\r\n");

// #3
for (final byte[] arg : args) {
    os.write('$');
    os.write(arg.length);
    os.write("\r\n");
    os.write(arg);
    os.write("\r\n");
}

相信这个时候,你再来看这段代码应该就明白了. 就是平铺直叙的将协议'翻译'成代码.

所以说,当我们需要和服务器通信的时候,也未必是必须依赖Redis的依赖包,我们完成可以自己通过Socket与服务器直接通信. 比如下面这段简单的代码,就可以直接和Redis通信了,当然它很简单.这里只是提供给你一个思路.

import java.io.IOException;
import java.net.Socket;

public static void connectRedis(String key, String value) throws IOException {

    Socket client = new Socket(host, port);

    // 执行 set key value命令
    StringBuilder command = new StringBuilder();

    String number = "*3" + CRLF;
    command.append(number);

    String cmd = "" + CRLF + "SET" + CRLF;
    command.append(cmd);

    cmd = "$" + key.getBytes().length + CRLF + key + CRLF;
    command.append(cmd);

    cmd = "$" + value.getBytes().length + CRLF + value + CRLF;
    command.append(cmd);

    // 向服务器发送命令
    client.getOutputStream().write(command.toString().getBytes());

    // 接收服务器响应
    byte[] response = new byte[1024];
    client.getInputStream().read(response);
    System.out.println(new String(response, 0, response.length));

}

在开篇我们也讲到,Redis分布式锁的实现有jedis和redisson两种. jedis的底层通信是直接基于Socket, 而redisson的底层与服务器通信是基于Netty.


https://www.xamrdz.com/backend/3yj1945695.html

相关文章: