Hadoop2源码分析-RPC探索实战(hadoop源码编译的作用)

  本篇文章为你整理了Hadoop2源码分析-RPC探索实战(hadoop源码编译的作用)的详细内容,包含有hadoop 2.x hdfs源码剖析 hadoop源码编译的作用 hadoop2.x采用什么技术构建源代码 hadoop开源 Hadoop2源码分析-RPC探索实战,希望能帮助你了解 Hadoop2源码分析-RPC探索实战。

   在《Hadoop2源码分析-RPC机制初识》博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO、动态代理与反射等。本篇博客介绍的内容目录如下所示:

  Java NIO简述

  Java NIO实例演示

  动态代理与反射简述

  动态代理与反射实例演示

  Hadoop V2 RPC框架使用实例

   下面开始今天的博客介绍。

  2.Java NIO简述

   Java NIO又称Java New IO,它替代了Java IO API,提供了与标准IO不同的IO工作方式。Java NIO由一下核心组件组成:

  Channels:连接通道,即能从通道读取数据,又能写数据到通道。可以异步读写,读写从Buffer开始。

  Buffers:消息缓冲区,用于和NIO通道进行交互。所谓缓冲区,它是一块可以读写的内存,该内存被封装成NIO的Buffer对象,并提供相应的方法,以便于访问。

  Selectors:通道管理器,它能检测到Java NIO中多个通道,单独的线程可以管理多个通道,间接的管理多个网络连接。

   下图为Java NIO的工作原理图,如下图所示:

  3.Java NIO实例演示

  NIOServer

   首先,我们来看NIOServer的代码块。代码内容如下所示:

  

package cn.hadoop.nio;

 

  import java.io.IOException;

  import java.net.InetSocketAddress;

  import java.nio.ByteBuffer;

  import java.nio.channels.SelectionKey;

  import java.nio.channels.Selector;

  import java.nio.channels.ServerSocketChannel;

  import java.nio.channels.SocketChannel;

  import java.util.Iterator;

  import org.slf4j.Logger;

  import org.slf4j.LoggerFactory;

  import cn.hadoop.conf.ConfigureAPI;

   * @Date May 8, 2015

   * @Author dengjie

   * @Note Defined nio server

  public class NIOServer {

   private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);

   // The channel manager

   private Selector selector;

   * Get ServerSocket channel and initialize

   * 1.Get a ServerSocket channel

   * 2.Set channel for non blocking

   * 3.The channel corresponding to the ServerSocket binding to port port

   * 4.Get a channel manager

   * 5.The channel manager and the channel binding, and the channel registered

   * SelectionKey.OP_ACCEPT event

   * @param port

   * @throws IOException

   public void init(int port) throws IOException {

   ServerSocketChannel serverChannel = ServerSocketChannel.open();

   serverChannel.configureBlocking(false);

   serverChannel.socket().bind(new InetSocketAddress(port));

   this.selector = Selector.open();

   serverChannel.register(selector, SelectionKey.OP_ACCEPT);

   * listen selector

   * @throws IOException

   public void listen() throws IOException {

   LOGGER.info("Server has start success");

   while (true) {

   selector.select();

   Iterator SelectionKey ite = this.selector.selectedKeys().iterator();

   while (ite.hasNext()) {

   SelectionKey key = (SelectionKey) ite.next();

   ite.remove();

   if (key.isAcceptable()) {

   ServerSocketChannel server = (ServerSocketChannel) key.channel();

   SocketChannel channel = server.accept();

   channel.configureBlocking(false);// 非阻塞

   channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes()));

   channel.register(this.selector, SelectionKey.OP_READ);// 设置读的权限

   } else if (key.isReadable()) {

   read(key);

   * Deal client send event

   public void read(SelectionKey key) throws IOException {

   SocketChannel channel = (SocketChannel) key.channel();

   ByteBuffer buffer = ByteBuffer.allocate(1024);

   channel.read(buffer);

   byte[] data = buffer.array();

   String info = new String(data).trim();

   LOGGER.info("Server receive info : " + info);

   ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());

   channel.write(outBuffer);// 将消息回送给客户端

   public static void main(String[] args) {

   try {

   NIOServer server = new NIOServer();

   server.init(ConfigureAPI.ServerAddress.NIO_PORT);

   server.listen();

   } catch (Exception ex) {

   ex.printStackTrace();

   LOGGER.error("NIOServer main run error,info is " + ex.getMessage());

  }

 

  
public void init(String ip, int port) throws Exception {

   SocketChannel channel = SocketChannel.open();

   channel.configureBlocking(false);

   this.selector = Selector.open();

   channel.connect(new InetSocketAddress(ip, port));

   channel.register(selector, SelectionKey.OP_CONNECT);

   * listen selector

   public void listen() throws Exception {

   while (true) {

   selector.select();

   Iterator SelectionKey ite = this.selector.selectedKeys().iterator();

   while (ite.hasNext()) {

   SelectionKey key = (SelectionKey) ite.next();

   ite.remove();

   if (key.isConnectable()) {

   SocketChannel channel = (SocketChannel) key.channel();

   if (channel.isConnectionPending()) {

   channel.finishConnect();

   channel.configureBlocking(false);// 非阻塞

   channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes()));

   channel.register(this.selector, SelectionKey.OP_READ);

   } else if (key.isReadable()) {

   read(key);

   * Deal client send event

   public void read(SelectionKey key) throws IOException {

   SocketChannel channel = (SocketChannel) key.channel();

   ByteBuffer buffer = ByteBuffer.allocate(1024);

   channel.read(buffer);

   byte[] data = buffer.array();

   String info = new String(data).trim();

   LOGGER.info("Client receive info : " + info);

   ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());

   channel.write(outBuffer);

   public static void main(String[] args) {

   try {

   NIOClient client = new NIOClient();

   client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT);

   client.listen();

   } catch (Exception ex) {

   ex.printStackTrace();

   LOGGER.error("NIOClient main run has error,info is " + ex.getMessage());

  }

 

 

  
public static final int NIO_PORT = 8888;

   public static final String NIO_IP = "127.0.0.1";

  }

 

 

  4.动态代理和反射简述

   在Java中,动态代理主要用来做方法的增强,可以在不修改源码的情况下,增强一些方法。另外,还有一个作用就是做远程调用,比如现在有Java接口,该接口的实现部署在非本地服务器上,在编写客户端代码时,由于没法直接生成该对象,这个时候就需要考虑使用动态代理了。

   而反射,利用了Class类作为反射实例化对象的基本应用,对于一个实例化对象而言,它需要调用类中的构造方法,属性和一般方法,这些操作都可以通过反射机制来完成。下面我们用一个实例来理解这些理论。

  5.动态代理和反射实例演示

  5.1动态代理

  JProxy

  
public static void main(String[] args) {

   JInvocationHandler ji = new JInvocationHandler();

   Subject sub = (Subject) ji.bind(new RealSubject());

   System.out.println(sub.say("dengjie", 25));

  interface Subject {

   public String say(String name, int age);

  class RealSubject implements Subject {

   @Override

   public String say(String name, int age) {

   return name + "," + age;

  class JInvocationHandler implements InvocationHandler {

   private Object object = null;

   public Object bind(Object object) {

   this.object = object;

   return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this);

   @Override

   public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

   Object tmp = method.invoke(this.object, args);

   return tmp;

  }

 

 

  5.2反射

  JReflect

  
public static void main(String[] args) {

   Fruit f = Factory.getInstance(Orange.class.getName());

   if (f != null) {

   f.eat();

  interface Fruit {

   public abstract void eat();

  class Apple implements Fruit {

   @Override

   public void eat() {

   System.out.println("apple");

  class Orange implements Fruit {

   @Override

   public void eat() {

   System.out.println("orange");

  class Factory {

   public static Fruit getInstance(String className) {

   Fruit f = null;

   try {

   f = (Fruit) Class.forName(className).newInstance();

   } catch (Exception e) {

   e.printStackTrace();

   return f;

  }

 

 

  6.Hadoop V2 RPC框架使用实例

   本实例主要演示通过Hadoop V2的RPC框架实现一个计算两个整数的Add和Sub,服务接口为CaculateService ,继承于 VersionedProtocol ,具体代码如下所示:

  CaculateService

  
import org.apache.hadoop.ipc.ProtocolInfo;

  import org.apache.hadoop.ipc.VersionedProtocol;

  import cn.hadoop.conf.ConfigureAPI;

   * @Date May 7, 2015

   * @Author dengjie

   * @Note Data calculate service interface

  @ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION)

  public interface CaculateService extends VersionedProtocol {

   // defined add function

   public IntWritable add(IntWritable arg1, IntWritable arg2);

   // defined sub function

   public IntWritable sub(IntWritable arg1, IntWritable arg2);

  }

 

 

   注意,本工程使用的是Hadoop-2.6.0版本,这里CaculateService接口需要加入注解,来声明版本号。

   CaculateServiceImpl类实现CaculateService接口。代码如下所示:

  CaculateServiceImpl

  
import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.ipc.ProtocolSignature;

  import cn.hadoop.conf.ConfigureAPI;

  import cn.hadoop.service.CaculateService;

   * @Date May 7, 2015

   * @Author dengjie

   * @Note Implements CaculateService class

  public class CaculateServiceImpl implements CaculateService {

   public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {

   return this.getProtocolSignature(arg0, arg1, arg2);

   * Check the corresponding version

   public long getProtocolVersion(String arg0, long arg1) throws IOException {

   return ConfigureAPI.VersionID.RPC_VERSION;

   * Add nums

   public IntWritable add(IntWritable arg1, IntWritable arg2) {

   return new IntWritable(arg1.get() + arg2.get());

   * Sub nums

   public IntWritable sub(IntWritable arg1, IntWritable arg2) {

   return new IntWritable(arg1.get() - arg2.get());

  }

 

 

   CaculateServer服务类,对外提供服务,具体代码如下所示:

  CaculateServer

  
import cn.hadoop.service.CaculateService;

  import cn.hadoop.service.impl.CaculateServiceImpl;

   * @Date May 7, 2015

   * @Author dengjie

   * @Note Server Main

  public class CaculateServer {

   private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class);

   public static final int IPC_PORT = 9090;

   public static void main(String[] args) {

   try {

   Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class)

   .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build();

   server.start();

   LOGGER.info("CaculateServer has started");

   System.in.read();

   } catch (Exception ex) {

   ex.printStackTrace();

   LOGGER.error("CaculateServer server error,message is " + ex.getMessage());

  }

 

 

   注意,在Hadoop V2版本中,获取RPC下的Server对象不能在使用RPC.getServer()方法了,该方法已被移除,取而代之的是使用Builder方法来构建新的Server对象。

   RPCClient客户端类,用于访问Server端,具体代码实现如下所示:

  RPCClient

  
public static void main(String[] args) {

   InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT);

   try {

   RPC.getProtocolVersion(CaculateService.class);

   CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class,

   RPC.getProtocolVersion(CaculateService.class), addr, new Configuration());

   int add = service.add(new IntWritable(2), new IntWritable(3)).get();

   int sub = service.sub(new IntWritable(5), new IntWritable(2)).get();

   LOGGER.info("2+3=" + add);

   LOGGER.info("5-2=" + sub);

   } catch (Exception ex) {

   ex.printStackTrace();

   LOGGER.error("Client has error,info is " + ex.getMessage());

  }

 

 

   Hadoop V2 RPC服务端截图预览,如下所示:

   Hadoop V2 RPC客户端截图预览,如下所示:

   Hadoop V2 RPC框架对Socket通信进行了封装,定义了自己的基类接口VersionProtocol。该框架需要通过网络以序列化的方式传输对象,关于Hadoop V2的序列化可以参考《Hadoop2源码分析-序列化篇》,传统序列化对象较大。框架内部实现了基于Hadoop自己的服务端对象和客户端对象。服务端对象通过new RPC.Builder().builder()的方式来获取,客户端对象通过RPC.getProxy()的方式来获取。并且都需要接受Configuration对象,该对象实现了Hadoop相关文件的配置。

  8.结束语

   这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  以上就是Hadoop2源码分析-RPC探索实战(hadoop源码编译的作用)的详细内容,想要了解更多 Hadoop2源码分析-RPC探索实战的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: