WeBankFinTech / WeDataSphere

WeDataSphere is a financial grade, one-stop big data platform suite.
661 stars 162 forks source link

【有奖征文】DSS+Linkis在知因智慧使用情况 #15

Open dlimeng opened 4 years ago

dlimeng commented 4 years ago

一.应用场景

首先感谢社区各位大佬的指点,学习到很多。

知因智慧是一家toB金融公司,里面需要大量的ETL过程,原先用Shell脚本连接各种Hql,Spark等等,XXL- Job调度,可能一个模块就被一个大的脚本包含住了,耦合性特别强,调度这块也有问题,无法监控中间的报错,2019下半年时看到社区开源组件,一直研究怎么跟公司整合。

希望借助社区的力量,结合公司实际情况,打通公司级数据中台的流程,目前数据建设主要集中在元数据管理,数据仓库ETL流程,数据质量,任务调度这几个方面。

二. 解决的问题

基于LDAP服务

基于LDAP管理用户,代理服务模块修改,以组为单位共用账户,公司的整个数据开发人员不多,基于这种方式可以支撑下去。

object LDAPUtils extends Logging {
  val url =  CommonVars("wds.linkis.ldap.proxy.url", "").getValue
  val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue
  def login(userID: String, password: String): Unit = {
    if(userID.isEmpty) throw new NamingException("userID is null")
    val env = new Hashtable[String, String]()
    val bindDN = "uid="+userID+","
    val bindPassword = password
    env.put(Context.SECURITY_AUTHENTICATION, "simple")
    env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory")
    env.put(Context.PROVIDER_URL, url)
    env.put(Context.SECURITY_PRINCIPAL, bindDN+baseDN)
    env.put(Context.SECURITY_CREDENTIALS, bindPassword)
    new InitialLdapContext(env, null)
    info(s"user $userID login success.")
  }
}

放开权限

因为资源有限,有些权限管理很繁琐,放开一些权限:

filesystem.path 存储日志,脚本的权限,统一根据nfs挂载,把目录权限统一根据用户随意修改。

linkis-metadata元数据管理,把Hive相关权限放开(HiveMetaDao.xml):

<select id="getDbsByUser" resultType="java.lang.String" parameterType="java.lang.String" databaseId="mysql">
   select NAME from DBS GROUP BY NAME order by NAME
</select>
<select id="getTablesByDbNameAndUser" resultType="map"  parameterType="map" databaseId="mysql">
    select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
    from TBLS t2, DBS t3 where 1=1 and t2.DB_ID=t3.DB_ID and t3.NAME = #{dbName,jdbcType=VARCHAR}
    order by NAME;
</select>

依赖兼容问题

因为环境是CDH-5.16.2 编译部署DSS和Linkis是根据原生的版本号,大部分服务都没有问题,但是有的服务有些问题,因为CDH会把组件重新编译,有的指令会改变。

原先发生过Hive 和 Spark应该支持的函数,到Scriptis上运行脚本,不支持,这是因为得把两个服务相关的Hive 和 Spark jar 都变成后缀带有CDH的。

image

tez的支持

Linkis Hive引擎对tez的支持:

image

自定义变量的支持:

CustomVariableUtils 工具类中,Shell关枚举都要添加上。

/**
 * @Classname ShellScriptCompaction
 * @Description TODO
 * @Date 2020/8/19 18:22
 * @Created by limeng
 */
class ShellScriptCompaction private extends CommonScriptCompaction{
  override def prefixConf: String = "#conf@set"
  override def prefix: String = "#@set"
  override def belongTo(suffix: String): Boolean ={
    suffix match {
      case "sh"=>true
      case _=>false
    }
  }
}
object ShellScriptCompaction{
  val shellScriptCompaction:ShellScriptCompaction=new ShellScriptCompaction
  def apply(): CommonScriptCompaction = shellScriptCompaction
}

ScriptFsWriter Shell相关 def listCompactions(): Array[Compaction] = Array(PYScriptCompaction(),QLScriptCompaction(),ScalaScriptCompaction(),ShellScriptCompaction()) WorkspaceUtil 工具类正则有问题,无法修改名称,中间有.符号去除

public static void fileAndDirNameSpecialCharCheck(String path) throws WorkSpaceException {
    String name = new File(path).getName();
    LOGGER.info(path);
    String specialRegEx = "[ _`~!@#$%^&*()+=|{}':;',\\[\\]<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?]|\n|\r|\t";
    Pattern specialPattern = Pattern.compile(specialRegEx);
    if(specialPattern.matcher(name).find()){
        throw new WorkSpaceException("the path exist special char");
    }
}

使用 eventReceiver节点异常(eventchecker组件)#247


EventCheckerNodeExecution.scala
Utils.tryFinally {
          resultSetWriter.addMetaData(null)

          resultSetWriter.addRecord(new LineRecord(action.saveKeyAndValue))

        }(Utils.tryQuietly(resultSetWriter.close()))

      }
      response.setIsSucceed(true)

    }else{

............................................. AppJointEntranceJob.scala override def run(): Unit = { if(!isScheduled) return info(s"$getId starts to run") getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(s"$getId starts to execute."))) startTime = System.currentTimeMillis getExecutor match { case appjointEntranceEngine:AppJointEntranceEngine => appjointEntranceEngine.setJob(this) appjointEntranceEngine.setInstance(Sender.getThisInstance) } Utils.tryAndErrorMsg(transition(Running))(s"transition $getId from Scheduler to Running failed.")

>加入spark streaming

* linkis-ujes-spark-enginemanager引入依赖,spark-streaming相关。
* SparkScalaExecutor.scala bindSparkSession方法引入相关依赖

测试结果
![企业微信截图_16062055657760](https://user-images.githubusercontent.com/16789827/100068209-2eb9a580-2e72-11eb-96e2-3b96f3e8b169.png)

>调试服务

把有问题的服务,bin目录下启动脚本,远程debug打开

![image](https://user-images.githubusercontent.com/16789827/99178324-ec46e900-274c-11eb-8ade-041cd1ea99a2.png)

因为平台Cookie的原因,直接用接口发送请求,有的无法调试:

```java
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
 * @Classname HttpUtil
 * @Description TODO
 * @Date 2020/10/30 14:49
 * @Created by limeng
 */
public class HttpUtil {
    public static RestTemplate getRestClient(){
        CloseableHttpClient build =  HttpClientBuilder.create().useSystemProperties().build();
        return new RestTemplate(new HttpComponentsClientHttpRequestFactory(build));
    }
}
import com.linkis.web.utils.HttpUtil;
import net.sf.json.JSONObject;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
/**
 * @Classname LinkisMain
 * @Description TODO
 * @Date 2020/10/30 14:54
 * @Created by limeng
 * 测试类 EntranceRestfulTest
 */
public class LinkisMain {
    public static void main(String[] args) {
        RestTemplate restClient = HttpUtil.getRestClient();
        JSONObject postData = new JSONObject();
        postData.put("password","hdfs");
        postData.put("userName","hdfs");
        String loginUrl = "http://192.168.200.116:8088/api/rest_j/v1/user/login";
        ResponseEntity<JSONObject> jsonResponseEntity = restClient.postForEntity(loginUrl, postData, JSONObject.class);
        System.out.println("状态码:"+jsonResponseEntity.getStatusCodeValue());
        JSONObject body = jsonResponseEntity.getBody();
        System.out.println("body :" + body.toString());
    }
}

三.最佳实践

我以公司标签库为例,讲述下操作流程。

对企业数据进行挖掘和分析,建立标签特征体系,创建个性化的多层级标签,并在此基础上进行细分和精准营销场景应用,有利于对企业/集团进行深入经营,充分挖掘企业/集团潜力,提升企业/集团价值。以此为目标的标签库的构建,将有利于了解和深耕企业/集团,可更好地助力于企业金融服务。

image

image

这是执行流程图,中间调度过程,Hql,Spark目前运行在DSS平台上。

image

其中一个企业经营特别的跑批流程。

软件版本

CDH-5.16.2

Hadoop-2.6

Hive-1.1

Spark-2.2

kafka_2.11-1.0.1