深入Jedis

huangy 9年前發布 | 14K 次閱讀 Jedis Redis NoSQL數據庫

來自: https://blog.huachao.me/2016/2/深入Jedis/

RESP(REdis Serialization Protocol)是redis server與redis client的通信協議。

  • TCP Port 6379
  • Request-Response模型。2個例外,1)pipeline;2)pub/sub
  • 5種DataType,Simple String(+);Errors(-);Integers(:);Bulk String($);Arrays(*)
  • \r\n (CRLF)是結束符
  • Simple String 例子: "+OK\r\n"
  • Errors 例子: -WRONGTYPE Operation against a key holding the wrong kind of value
  • Integer 例子: ":1000\r\n"
  • Bulk String 例子: "$6\r\nfoobar\r\n" 6表示后面有6個byte的長度
  • Null 例子: "$-1\r\n"
  • Arrays 例子: "*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n" 2表示有2個元素; "*0\r\n" 表示空數組
  • 客戶端發送命令:就是Bulk String。例子: llen mylist -> *2\r\n$4\r\nllen\r\n$6\r\nmylist\r\n
  • redis服務器回答RESP DataType。例子: :48293\r\n

Jedis對RESP協議的抽象

  • Protocol是實現上述RESP協議的主要類,其中可以看到 sendCommand(final RedisOutputStream os, final byte[] command, final byte[]... args) 是如何根據協議拼接字符串發送到redis server, Object read(final RedisInputStream is) 是如何接收redis server的返回,并且轉換為Java Object。
  • BinaryXxxCommands <- BinaryJedis, XxxCommands <- Jedis 用來抽象所有通過二進制流來發送的Redis命令
  • XxxCommands <- Jedis 用來抽象類似ClusterCommands的命令,最終都是走的二進制流,去掉Binary一層估計是作者覺得厭煩了。不對之處還請賜教。
  • Commands, Connection <- BinaryClient <- Client 抽象了網絡發送命令和接收回復,其中Client將參數encode為byte[],然后調用BinaryClient的方法;BinaryClient調用Connection#sendCommand;sendCommand調用connect(),構造RedisInputStream和RedisOutputStream,用Protocol.sendCommand來發送命令;client.getXxxReply()首先將outputstream中的內容flush出去,然后調用Protocol.read來處理接收到的返回值。

    /* 發送命令 Connection.java */
    protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
        try {
          connect();
          Protocol.sendCommand(outputStream, cmd, args);
          pipelinedCommands++;
          return this;
        } catch (JedisConnectionException ex) {
          // Any other exceptions related to connection?
          broken = true;
          throw ex;
        }
    }
    
    public void connect() {
        if (!isConnected()) {
          try {
            socket = new Socket();
            // ->@wjw_add
            socket.setReuseAddress(true);
            socket.setKeepAlive(true); // Will monitor the TCP connection is
            // valid
            socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
            // ensure timely delivery of data
            socket.setSoLinger(true, 0); // Control calls close () method,
            // the underlying socket is closed
            // immediately
            // <-@wjw_add
    
            socket.connect(new InetSocketAddress(host, port), connectionTimeout);
            socket.setSoTimeout(soTimeout);
            outputStream = new RedisOutputStream(socket.getOutputStream());
            inputStream = new RedisInputStream(socket.getInputStream());
          } catch (IOException ex) {
            broken = true;
            throw new JedisConnectionException(ex);
          }
        }
    }
    
    /* 接收回復 */
    public String getBulkReply() {
        final byte[] result = getBinaryBulkReply();
        if (null != result) {
          return SafeEncoder.encode(result);
        } else {
          return null;
        }
    }
    
    public byte[] getBinaryBulkReply() {
        flush();
        pipelinedCommands--;
        return (byte[]) readProtocolWithCheckingBroken();
    }
    
    protected Object readProtocolWithCheckingBroken() {
        try {
          return Protocol.read(inputStream);
        } catch (JedisConnectionException exc) {
          broken = true;
          throw exc;
        }
    }
  • Jedis通過Pipeline這個類來對redis的pipeline進行抽象, jedis.pipelined() 返回一個Pipeline實例,并且這個Pipeline實例的client就是當前jedis實例的client;調用 pipeline.a_redis_command() 的時候會有一個 responseList ,用來記錄每個command應該對應的response; pipeline.syncAndReturnAll() 會調用 client.getAll() 將所有command一次 flush() 出去,然后拿回List<Object>,再將這些Object填充到responseList中。

Jedis使用注意事項

  • Jedis instance本身不是線程安全的!要用JedisPool

    //將JedisPool定義為spring單例
    JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost");
    
    Jedis jedis = null;
    try {
      jedis = pool.getResource();
      /// ... do stuff here ... for example
      jedis.set("foo", "bar");
      String foobar = jedis.get("foo");
      jedis.zadd("sose", 0, "car"); jedis.zadd("sose", 0, "bike"); 
      Set<String> sose = jedis.zrange("sose", 0, -1);
    } finally {
      if (jedis != null) {
        jedis.close();
      }
    }
    /// ... when closing your application:
    pool.destroy();

    </div> </li>

  • JedisPool是一個包裝模式,內部就是Apache Common Pool 2, Pool里面裝的是Jedis。Jedis之所以不是線程安全的主要是由于Jedis類中的fields(client, pipeline, transaction)沒有做同步。如果每個thread都有一份Jedis實例,其實也不存在線程安全問題,就是要注意使用完了需要 jedis.close() 。JedisPool和DBCP的Pool一樣,就是用來創建Jedis實例,然后提供給線程使用,Pool技術能夠復用已經標記為IDLE的Jedis,以此來提供內存利用率和減小開銷。

  • </ul> </div>

 本文由用戶 huangy 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!