Java領(lǐng)域要說(shuō)讓我最服氣的RPC框架當(dāng)屬Dubbo,原因有許多,但是最吸引我的還是它把遠(yuǎn)程調(diào)用這個(gè)事情設(shè)計(jì)得很有藝術(shù)。
Java領(lǐng)域要說(shuō)讓我最服氣的RPC框架當(dāng)屬Dubbo,原因有許多,但是最吸引我的還是它把遠(yuǎn)程調(diào)用這個(gè)事情設(shè)計(jì)得很有藝術(shù)。
業(yè)內(nèi)對(duì)于微服務(wù)之間調(diào)用的框架選擇較多,主流是Spring Cloud的Rest方式 和 Dubbo方式,我使用Dubbo方式居多。Dubbo工業(yè)級(jí)可用,穩(wěn)定又高效,深受各大公司研發(fā)同學(xué)的喜愛(ài)。
Dubbo的優(yōu)點(diǎn)較多,比如:
但是,Dubbo最吸引人的,
半支煙
覺(jué)得反而倒是它的RPC調(diào)用。Dubbo的定位是一個(gè)RPC框架,這是它的核心和立足之地,所以Dubbo將RPC的調(diào)用過(guò)程透明化,使得開(kāi)發(fā)者可以專注于業(yè)務(wù)邏輯,而不用關(guān)注底層通信問(wèn)題。
一個(gè)RPC框架只有聚焦于先做好它的RPC調(diào)用過(guò)程這個(gè)模塊,才會(huì)有人關(guān)注,其余的優(yōu)點(diǎn)都是在這之后,慢慢迭代而來(lái)。
作者將RPC調(diào)用的這個(gè)過(guò)程,抽象成 一種協(xié)議消息的傳輸機(jī)制 ,再通過(guò) 控制好線程的等待和喚醒 ,來(lái)實(shí)現(xiàn)遠(yuǎn)程方法調(diào)用。這一設(shè)計(jì)思路真是美妙,充分體驗(yàn)了作者的智慧。
學(xué)Dubbo,首先就是要學(xué)習(xí)作者這種設(shè)計(jì)理念和思路;诖耍瑏(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)易的遠(yuǎn)程方法調(diào)用,將Dubbo的RPC過(guò)程簡(jiǎn)易化。
簡(jiǎn)易的RPC過(guò)程步驟如下,大致分5步,依舊使用Netty作用Socket通訊工具。
public class BProcessServer {
private final int port;
public BProcessServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new BProcessServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("B啟動(dòng)了服務(wù),端口號(hào): " + port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new BProcessServer(8088).start();
}
}
public class BProcessServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String reqData = msg.toString(CharsetUtil.UTF_8);
System.out.println("B進(jìn)程接受到了請(qǐng)求數(shù)據(jù): " + reqData);
executeMethod(ctx);
}
/**
* 執(zhí)行方法
*
* @param ctx
* @throws InterruptedException
*/
private void executeMethod(ChannelHandlerContext ctx) throws InterruptedException {
// TODO 將請(qǐng)求消息按照某種規(guī)則解析成方法名、方法參數(shù)等,其實(shí)就是反序列化的過(guò)程。
System.out.println("對(duì)接受的數(shù)據(jù)做反序列化,然后開(kāi)始執(zhí)行 消息體里指定的方法...");
// 模擬方法執(zhí)行
Thread.sleep(2000);
System.out.println("執(zhí)行完畢,返回結(jié)果...");
// 將結(jié)果 通知給 A 進(jìn)程
ByteBuf dataByteBuf = ctx.alloc().buffer().writeBytes("Task completed".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(dataByteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public class AProcessClient {
private final String host;
private final int port;
private final Object lock = new Object(); // 監(jiān)視器對(duì)象
public AProcessClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AProcessClientHandler(lock));
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
System.out.println("A進(jìn)程與B進(jìn)程建立了通信連接");
Channel channel = future.channel();
// 發(fā)起遠(yuǎn)程調(diào)用
callRemoteMethod(channel);
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
/**
* 執(zhí)行方法
*
* @param channel
* @throws InterruptedException
*/
private void callRemoteMethod(Channel channel) throws InterruptedException {
//TODO 此處需要將調(diào)用的方法和參數(shù),按照協(xié)議進(jìn)行序列化。這次暫且省去此過(guò)程。
System.out.println("A進(jìn)程將 請(qǐng)求的方法和參數(shù) 進(jìn)行序列化,然后向B進(jìn)程發(fā)起網(wǎng)絡(luò)調(diào)用...");
ByteBuf dataByteBuf = channel.alloc().buffer().writeBytes("Start call method".getBytes(CharsetUtil.UTF_8));
channel.writeAndFlush(dataByteBuf);
// 使用wait等待B進(jìn)程通知
synchronized (lock) {
System.out.println("A進(jìn)程等待B進(jìn)程的響應(yīng)...");
lock.wait(); // 等待通知
}
System.out.println("A進(jìn)程收到了B進(jìn)程的響應(yīng)通知,繼續(xù)往下...");
}
public static void main(String[] args) throws InterruptedException {
new AProcessClient("localhost", 8088).start();
}
}
lock.wait()
以后的代碼得以繼續(xù)執(zhí)行。
public class AProcessClientHandler extends SimpleChannelInboundHandler {
private final Object lock;
public AProcessClientHandler(Object lock) {
this.lock = lock;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String resData = msg.toString(CharsetUtil.UTF_8);
System.out.println("A進(jìn)程接受到了響應(yīng)數(shù)據(jù): " + resData);
// B 進(jìn)程任務(wù)完成,使用 notify 喚醒等待的線程
synchronized (lock) {
lock.notify(); // 喚醒 A 進(jìn)程
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Dubbo的優(yōu)秀設(shè)計(jì)思路有許多,我只鐘情其一,那就是RPC的調(diào)用過(guò)程。以上是一個(gè)簡(jiǎn)易的RPC遠(yuǎn)程調(diào)用的示例,用于理解Dubbo的原理和源碼,希望對(duì)你有幫助!
本篇完結(jié)!歡迎 關(guān)注、加V(yclxiao)交流、全網(wǎng)可搜(程序員半支煙)
原文鏈接: https://mp.weixin.qq.com/s/J0fzDH-iqGnnnjqaXMLs-A
機(jī)器學(xué)習(xí):神經(jīng)網(wǎng)絡(luò)構(gòu)建(下)
閱讀華為Mate品牌盛典:HarmonyOS NEXT加持下游戲性能得到充分釋放
閱讀實(shí)現(xiàn)對(duì)象集合與DataTable的相互轉(zhuǎn)換
閱讀鴻蒙NEXT元服務(wù):論如何免費(fèi)快速上架作品
閱讀算法與數(shù)據(jù)結(jié)構(gòu) 1 - 模擬
閱讀基于鴻蒙NEXT的血型遺傳計(jì)算器開(kāi)發(fā)案例
閱讀5. Spring Cloud OpenFeign 聲明式 WebService 客戶端的超詳細(xì)使用
閱讀Java代理模式:靜態(tài)代理和動(dòng)態(tài)代理的對(duì)比分析
閱讀Win11筆記本“自動(dòng)管理應(yīng)用的顏色”顯示規(guī)則
閱讀本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請(qǐng)發(fā)郵件[email protected]
湘ICP備2022002427號(hào)-10 湘公網(wǎng)安備:43070202000427號(hào)© 2013~2025 haote.com 好特網(wǎng)