DataX二次开发——新增HiveReader插件()

  本篇文章为你整理了DataX二次开发——新增HiveReader插件()的详细内容,包含有 DataX二次开发——新增HiveReader插件,希望能帮助你了解 DataX二次开发——新增HiveReader插件。

  一、研发背景

   DataX官方开源的版本支持HDFS文件的读写,并没有支持基于JDBC的Hive数据读写,很多时候一些数据同步不太方便,比如在读取Hive之前先执行一些sql、读取一些Hive的视图数据、或者在数据同步时执行一段固定的SQL,将SQL执行结果写入下游等各种场景,实际上还是需要Hive插件来支持。而在实际工作中,我们也遇到了类似的一些情况需要二次开发DataX以支持此类场景。本插件已在生产环境稳定运行一年有余,现分享给大家,如有问题也可联系我。

  二、HiveReader插件介绍

   hivereader插件比较简单,共有三个类,两个配置文件。其中:

  HiveReader:实现DataX框架核心方法,是具体逻辑。

  HiveReaderErrorCode:继承了DataX框架的ErrorCode类,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值。

  HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时,进行认证的工具类。

  plugin.json:DataX插件固定的配置文件,用于指定插件的入口类。

  plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式,此json文件即为HiveReader插件的配置方式说明。

  

   2.1 HiveReader类

   首先是HiveReader类,需要注意的是一些常量或枚举值,需要自行添加,其中DataBaseType枚举类中,需要新增Hive枚举项并添加Hive的驱动类全路径,具体见注释,另外就是Kerberos认证相关的几个配置,一个是keytab的路径,一个是krb5.conf的路径,另外一个是principle的值。

  

package com.alibaba.datax.plugin.reader.hivereader;

 

  import com.alibaba.datax.common.base.Key;

  import com.alibaba.datax.common.plugin.RecordSender;

  import com.alibaba.datax.common.spi.Reader;

  import com.alibaba.datax.common.util.Configuration;

  import com.alibaba.datax.rdbms.reader.CommonRdbmsReader;

  import com.alibaba.datax.rdbms.util.DataBaseType;

  import lombok.extern.slf4j.Slf4j;

  import org.apache.hadoop.security.authentication.util.KerberosName;

  import java.lang.reflect.Field;

  import java.util.List;

  import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根据条件自己取值

  import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 参数名:"fetchSize"

  @Slf4j

  public class HiveReader

   extends Reader

   //此处需现在com.sinosig.plumber.rdbms.util.DataBaseType枚举类中添加Hive类型,内容为:Hive("hive2", "org.apache.hive.jdbc.HiveDriver"),

   private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive;

   public static class Job

   extends Reader.Job

   private Configuration originalConfig = null;

   private CommonRdbmsReader.Job commonRdbmsReaderJob;

   @Override

   public void init()

   this.originalConfig = getPluginJobConf();

  
Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE_KERBEROS, false);

   if (haveKerberos) {

   log.info("检测到kerberos认证,正在进行认证");

   org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();

   String kerberosKeytabFilePath = this.originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);

   String kerberosPrincipal = this.originalConfig.getString(Key.KERBEROS_PRINCIPAL);

   String krb5Path = this.originalConfig.getString(Key.KRB5_CONF_FILE_PATH);

   hadoopConf.set("hadoop.security.authentication", "kerberos");

   hadoopConf.set("hive.security.authentication", "kerberos");

   hadoopConf.set("hadoop.security.authorization", "true");

   System.setProperty("java.security.krb5.conf",krb5Path);

   refreshConfig();

   HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path);

   this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);

   this.originalConfig = commonRdbmsReaderJob.init(originalConfig);

  
private Configuration readerSliceConfig;

   private CommonRdbmsReader.Task commonRdbmsReaderTask;

   @Override

   public void init()

   this.readerSliceConfig = getPluginJobConf();

   this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());

   this.commonRdbmsReaderTask.init(this.readerSliceConfig);

   @Override

   public void startRead(RecordSender recordSender)

   int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE);

   this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);

   @Override

   public void post()

   this.commonRdbmsReaderTask.post(readerSliceConfig);

   @Override

   public void destroy()

   this.commonRdbmsReaderTask.destroy(readerSliceConfig);

   /** 刷新krb内容信息 */

   public static void refreshConfig() {

   try {

   sun.security.krb5.Config.refresh();

   Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");

   defaultRealmField.setAccessible(true);

   defaultRealmField.set(

   null,

   org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());

   // reload java.security.auth.login.config

   javax.security.auth.login.Configuration.setConfiguration(null);

   } catch (Exception e) {

   log.warn(

   "resetting default realm failed, current default realm will still be used.", e);

  }

 

  2.2 HiveConnByKerberos类

   HiveConnByKerberos类比较简单,是一个通用的Kerberos认证的接口。

  

package com.alibaba.datax.plugin.reader.hivereader;

 

  import com.alibaba.datax.common.exception.PlumberException;

  import lombok.extern.slf4j.Slf4j;

  import org.apache.commons.lang3.StringUtils;

  import org.apache.hadoop.security.UserGroupInformation;

  @Slf4j

  public class HiveConnByKerberos {

   public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) {

   System.setProperty("java.security.krb5.conf",krb5conf);

   if (StringUtils.isNotBlank(kerberosPrincipal) StringUtils.isNotBlank(kerberosKeytabFilePath)) {

   UserGroupInformation.setConfiguration(hadoopConf);

   try {

   UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);

   catch (Exception e) {

   log.error("kerberos认证失败");

   String message = String.format("kerberos认证失败,请检查 " +

   "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",

   kerberosKeytabFilePath, kerberosPrincipal);

   e.printStackTrace();

   throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);

  }

 

  

  2.3 HiveReaderErrorCode类

   HiveReaderErrorCode类,主要就是集成ErrorCode类,并添加一个枚举项,这块可直接在ErrorCode类添加,也可使用此类,为固定写法。

  

package com.alibaba.datax.plugin.reader.hivereader;

 

  import com.alibaba.datax.common.spi.ErrorCode;

  public enum HiveReaderErrorCode

   implements ErrorCode

   KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS认证失败");

   private final String code;

   private final String description;

   HiveReaderErrorCode(String code, String description)

   this.code = code;

   this.description = description;

   @Override

   public String getCode()

   return this.code;

   @Override

   public String getDescription()

   return this.description;

   @Override

   public String toString()

   return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);

  }

 

  2.4 plugin.json文件

  

{

 

   "name": "hivereader",

   "class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader",

   "description": "Retrieve data from Hive via jdbc",

   "developer": "wxm"

  }

 

  2.5 plugin_job_template.json文件

   这块需要注意的一个问题是,如果Kerberos认证的Hive连接URL有两种方式,如果是基于zookeeper的方式,则需保证运行DataX服务的节点与zookeeper节点网络是打通的,并且一定不要忘记写上具体的Hive库名。

  

{

 

   "name": "hivereader",

   "parameter": {

   "column": [

   "username": "hive",

   "password": "",
"preSql":"show databases;",

   "connection": [

   "jdbcUrl": [

   "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM"

   "table": [

   "hive_reader"

   "where": "logdate=20211013" ,

   "haveKerberos": true,

   "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab",

   "kerberosPrincipal": "hive@EXAMPLE.COM"

  }

 

  

  以上就是DataX二次开发——新增HiveReader插件()的详细内容,想要了解更多 DataX二次开发——新增HiveReader插件的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: