Netty 模擬 Redis 服務器

ty061032 8年前發布 | 12K 次閱讀 Redis Netty NoSQL數據庫

Redis的客戶端與服務端采用叫做 RESP(Redis Serialization Protocol) 的網絡通信協議交換數據,客戶端和服務器通過 TCP 連接來進行數據交互, 服務器默認的端口號為 6379 。客戶端和服務器發送的命令或數據一律以 \r\n (CRLF) 結尾。

RESP支持五種數據類型:

狀態回復(status reply): 以“+”開頭,表示正確的狀態信息,”+”后就是具體信息?,比如:

redis 127.0.0.1:6379> set ss sdf
OK

其實它真正回復的數據是:+OK\r\n

錯誤回復(error reply):

以”-“開頭,表示錯誤的狀態信息,”-“后就是具體信息,比如:

redis 127.0.0.1:6379> incr ss
(error) ERR value is not an integer or out of range

整數回復(integer reply): 以”:”開頭,表示對某些操作的回復比如DEL, EXISTS, INCR等等

redis 127.0.0.1:6379> incr aa
(integer) 1

批量回復(bulk reply): 以”$”開頭,表示下一行的字符串長度,具體字符串在下一行中

多條批量回復(multi bulk reply): 以”*”開頭,表示消息體總共有多少行(不包括當前行)”*”是具體行數

redis 127.0.0.1:6379> get ss
"sdf"

客戶端->服務器
*2\r\n
$3\r\n
get\r\n
$2\r\n
ss\r\n
服務器->客戶端
$3\r\n
sdf\r\n

注:以上寫的都是XX回復,并不是說協議格式只是適用于服務器->客戶端,客戶端->服務器端也同樣使用以上協議格式,其實雙端協議格式的統一更加方便擴展

回到正題,我們這里是通過netty來模擬redis服務器,可以整理一下思路大概分為這么幾步:

1.需要一個底層的通信框架,這里選擇的是netty4.0.25
2.需要對客戶端穿過來的數據進行解碼(Decoder),其實就是分別處理以上5種數據類型
3.解碼以后我們封裝成更加利于理解的命令(Command),比如:set<name> foo hello<params>
4.有了命令以后就是處理命令(execute),其實我們可以去連接正在的redis服務器,不過這里只是簡單的模擬
5.處理完之后就是封裝回復(Reply),然后編碼(Encoder),需要根據不同的命令分別返回以后5種數據類型
6.測試驗證,通過redis-cli去連接netty模擬的redis服務器,看能否返回正確的結果

以上思路參考github上的一個項目: https://github.com/spullara/redis-protocol ,測試代碼也是在此基礎上做了一個簡化

第一步:通信框架netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.0.25.Final</version>
</dependency>

第二步:數據類型解碼

public class RedisCommandDecoder extends ReplayingDecoder<Void> {

    public static final char CR = '\r';
    public static final char LF = '\n';

    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';

    private byte[][] bytes;
    private int arguments = 0;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        if (bytes != null) {
            int numArgs = bytes.length;
            for (int i = arguments; i < numArgs; i++) {
                if (in.readByte() == DOLLAR_BYTE) {
                    int l = RedisReplyDecoder.readInt(in);
                    if (l > Integer.MAX_VALUE) {
                        throw new IllegalArgumentException(
                                "Java only supports arrays up to "
                                        + Integer.MAX_VALUE + " in size");
                    }
                    int size = (int) l;
                    bytes[i] = new byte[size];
                    in.readBytes(bytes[i]);
                    if (in.bytesBefore((byte) CR) != 0) {
                        throw new RedisException("Argument doesn't end in CRLF");
                    }
                    // Skip CRLF(\r\n)
                    in.skipBytes(2);
                    arguments++;
                    checkpoint();
                } else {
                    throw new IOException("Unexpected character");
                }
            }
            try {
                out.add(new Command(bytes));
            } finally {
                bytes = null;
                arguments = 0;
            }
        } else if (in.readByte() == ASTERISK_BYTE) {
            int l = RedisReplyDecoder.readInt(in);
            if (l > Integer.MAX_VALUE) {
                throw new IllegalArgumentException(
                        "Java only supports arrays up to " + Integer.MAX_VALUE
                                + " in size");
            }
            int numArgs = (int) l;
            if (numArgs < 0) {
                throw new RedisException("Invalid size: " + numArgs);
            }
            bytes = new byte[numArgs][];
            checkpoint();
            decode(ctx, in, out);
        } else {
            in.readerIndex(in.readerIndex() - 1);
            byte[][] b = new byte[1][];
            b[0] = in.readBytes(in.bytesBefore((byte) CR)).array();
            in.skipBytes(2);
            out.add(new Command(b, true));
        }
    }

}

首先通過接受到以“*”開頭的多條批量類型初始化二維數組byte[][] bytes,以讀取到第一個以\r\n結尾的數據作為數組的長度,然后再處理以“$”開頭的批量類型。

以上除了處理我們熟悉的批量和多條批量類型外,還處理了沒有任何標識的數據,其實有一個專門的名字叫 Inline命令:

有些時候僅僅是telnet連接Redis服務,或者是僅僅向Redis服務發送一個命令進行檢測。雖然Redis協議可以很容易的實現,但是使用Interactive sessions 并不理想,而且redis-cli也不總是可以使用。基于這些原因,Redis支持特殊的命令來實現上面描述的情況。這些命令的設計是很人性化的,被稱作Inline 命令。

第三步:封裝command對象

由第二步中可以看到不管是commandName還是params都統一放在了字節二維數組里面,最后封裝在command對象里面

public class Command {
    public static final byte[] EMPTY_BYTES = new byte[0];

    private final Object name;
    private final Object[] objects;
    private final boolean inline;

    public Command(Object[] objects) {
        this(null, objects, false);
    }

    public Command(Object[] objects, boolean inline) {
        this(null, objects, inline);
    }

    private Command(Object name, Object[] objects, boolean inline) {
        this.name = name;
        this.objects = objects;
        this.inline = inline;
    }

    public byte[] getName() {
        if (name != null)
            return getBytes(name);
        return getBytes(objects[0]);
    }

    public boolean isInline() {
        return inline;
    }

    private byte[] getBytes(Object object) {
        byte[] argument;
        if (object == null) {
            argument = EMPTY_BYTES;
        } else if (object instanceof byte[]) {
            argument = (byte[]) object;
        } else if (object instanceof ByteBuf) {
            argument = ((ByteBuf) object).array();
        } else if (object instanceof String) {
            argument = ((String) object).getBytes(Charsets.UTF_8);
        } else {
            argument = object.toString().getBytes(Charsets.UTF_8);
        }
        return argument;
    }

    public void toArguments(Object[] arguments, Class<?>[] types) {
        for (int position = 0; position < types.length; position++) {
            if (position >= arguments.length) {
                throw new IllegalArgumentException(
                        "wrong number of arguments for '"
                                + new String(getName()) + "' command");
            }
            if (objects.length - 1 > position) {
                arguments[position] = objects[1 + position];
            }
        }
    }

}

所有的數據都放在了Object數組里面,而且可以通過getName方法知道Object[0]就是commandName

第四步:執行命令

在經歷了解碼和封裝之后,下面需要實現handler類,用來處理消息

public class RedisCommandHandler extends SimpleChannelInboundHandler<Command> {

    private Map<String, Wrapper> methods = new HashMap<String, Wrapper>();

    interface Wrapper {
        Reply<?> execute(Command command) throws RedisException;
    }

    public RedisCommandHandler(final RedisServer rs) {
        Class<? extends RedisServer> aClass = rs.getClass();
        for (final Method method : aClass.getMethods()) {
            final Class<?>[] types = method.getParameterTypes();
            methods.put(method.getName(), new Wrapper() {
                @Override
                public Reply<?> execute(Command command) throws RedisException {
                    Object[] objects = new Object[types.length];
                    try {
                        command.toArguments(objects, types);
                        return (Reply<?>) method.invoke(rs, objects);
                    } catch (Exception e) {
                        return new ErrorReply("ERR " + e.getMessage());
                    }
                }
            });
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Command msg)
            throws Exception {
        String name = new String(msg.getName());
        Wrapper wrapper = methods.get(name);
        Reply<?> reply;
        if (wrapper == null) {
            reply = new ErrorReply("unknown command '" + name + "'");
        } else {
            reply = wrapper.execute(msg);
        }
        if (reply == StatusReply.QUIT) {
            ctx.close();
        } else {
            if (msg.isInline()) {
                if (reply == null) {
                    reply = new InlineReply(null);
                } else {
                    reply = new InlineReply(reply.data());
                }
            }
            if (reply == null) {
                reply = ErrorReply.NYI_REPLY;
            }
            ctx.write(reply);
        }
    }
}

在實例化handler的時候傳入了一個RedisServer對象,這個方法是真正用來處理redis命令的,理論上這個對象應該支持redis的所有命令,不過這里只是測試所有只提供了2個方法:

public interface RedisServer {

    public BulkReply get(byte[] key0) throws RedisException;

    public StatusReply set(byte[] key0, byte[] value1) throws RedisException;

}

在channelRead0方法中我們可以拿到之前封裝好的command方法,然后通過命令名稱執行操作,這里的RedisServer也很簡單,只是用簡單的hashmap進行臨時的保存數據。

第五步:封裝回復

第四步種我們可以看到處理完命令之后,返回了一個Reply對象

public interface Reply<T> {

    byte[] CRLF = new byte[] { RedisReplyDecoder.CR, RedisReplyDecoder.LF };

    T data();

    void write(ByteBuf os) throws IOException;
}

根據上面提到的5種類型再加上一個inline命令,根據不同的數據格式進行拼接,比如StatusReply:

public void write(ByteBuf os) throws IOException {
    os.writeByte('+');
    os.writeBytes(statusBytes);
    os.writeBytes(CRLF);
}

所以對應Decoder的Encoder就很簡單了:

public class RedisReplyEncoder extends MessageToByteEncoder<Reply<?>> {
    @Override
    public void encode(ChannelHandlerContext ctx, Reply<?> msg, ByteBuf out)
            throws Exception {
        msg.write(out);
    }
}

只需要將封裝好的Reply返回給客戶端就行了

最后一步:測試

啟動類:

public class Main {
    private static Integer port = 6379;

    public static void main(String[] args) throws InterruptedException {
        final RedisCommandHandler commandHandler = new RedisCommandHandler(
                new SimpleRedisServer());

        ServerBootstrap b = new ServerBootstrap();
        final DefaultEventExecutorGroup group = new DefaultEventExecutorGroup(1);
        try {
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100).localAddress(port)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new RedisCommandDecoder());
                            p.addLast(new RedisReplyEncoder());
                            p.addLast(group, commandHandler);
                        }
                    });

            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

ChannelPipeline分別添加了RedisCommandDecoder、RedisReplyEncoder和RedisCommandHandler,同時我們啟動的端口和Redis服務器端口是一樣的也是 6379

打開redis-cli程序:

redis 127.0.0.1:6379> get dsf
(nil)
redis 127.0.0.1:6379> set dsf dsfds
OK
redis 127.0.0.1:6379> get dsf
"dsfds"
redis 127.0.0.1:6379>

從結果可以看出和正常使用redis服務器沒有差別

總結

這樣做的意義其實就是可以把它當做一個 redis代理 ,由這個代理服務器去進行sharding處理,客戶端不直接訪問redis服務器,對客戶端來說,后臺redis集群是完全透明的。

 

來自:http://my.oschina.net/OutOfMemory/blog/738865

 

 本文由用戶 ty061032 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!