博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
实例对比 Julia, R, Python,谁是狼语言?
阅读量:4315 次
发布时间:2019-06-06

本文共 3868 字,大约阅读时间需要 12 分钟。

对于一个平台来说,使用者对技术本身是不敏感的,所以我们需要增加一些限制来减少集群的一些不可控情况,例如不断的写入新表/新数据却不记得删除,大量不按规范创建的表名等情况。与此同时应尽量让技术对用户透明,比如让其无感知的访问多种类型的数据库。

下文以拦截 spark.sql() 方法为例,通过为 hive表的添加生命周期,自动切换 tidb 表,表权限校验等几个小功能 来说明。

如何使用

代码
SparkSqlAspect.scala
为了便于理解以下代码会进行一些删减

import org.aspectj.lang.ProceedingJoinPoint

import org.aspectj.lang.annotation.{Around, Aspect}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{Dataset, Row, SparkSession, TiContext}

import cn.tongdun.datacompute.parser._

import cn.tongdun.datacompute.parser.spark.SparkSQLHelper

@Aspect

class SparkSqlAspect {
private val logger = LoggerFactory.getLogger(classOf[SparkSqlAspect])
private var tiContext: TiContext = null

@Around("execution(public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> org.apache.spark.sql.SparkSession.sql(java.lang.String)) && args(sqlRaw)")

def around(pjp: ProceedingJoinPoint,
sqlRaw: String): Dataset[Row] www.yongshiyule178.com= {
//sparkSQLHelper 是我们基于 antlr4 增加了一些 sparksql 语法的支持,例如建表时需要指定 lifecycle 等
val sql = SparkSQLHelper.format(sqlRaw)
val spark = pjp.getThis.asInstanceOf[SparkSession]
var dataset: Dataset[Row] = spark.emptyDataFrame
val statementData = SparkSQLHelper.getStatementData(sql)
val statement = statementData.getStatement()

//getType 方法用于获取sql的类型

statementData.getType match {
case StatementType.CREATE_TABLE =>
createMethod()

case StatementType.CREATE_TABLE_AS_SELECT =>

createAsSelectMethod()

case StatementType.SELECT =>

dataset = selectMethod(spark, inputSql, statement, pjp)

case _ =>

dataset = pjp.proceed(pjp.getArgs).asInstanceOf[Dataset[Row]]
}
dateset
}

// 建表必须带有 lifecycle 字段,并对表名进行校验,将相关信息注册到元数据系统等操作

def createMethod(): Unit = {
...
}

// 约定 create table as select 生成的表都为中间表,必须以 tdl_ 开头,lifecycle 固定为7天

def createAsSelectMethod(): Unit = {
...
}

// select 对多个数据库源进行判定以及对权限进行校验,下面以tidb为例

def selectMethod(spark: SparkSession,
inputSql: String,
statement: Statement,
pjp: ProceedingJoinPoint)www.mhylpt.com/: Dataset[Row] = {
val tableData = statement.asInstanceOf[TableData]
//获取所有需要访问的源表
tableData.getInputTables.toArray.foreach {
case t: TableSource =>
val databaseName = t.getDatabaseName
val tableName = t.getTableName
val fullTableName = databaseName + "." + tableName
//所有tidb的库都以tidb为前缀
if (t.getDatabaseName.startsWith("tidb")) {
//对tidb表权限进行校验
if(tableAuthCheck(...)){
//判断tiContext是否初始化
if (tiContext == null) {
tiContext = new TiContext(spark)
}
//对tidb表的表名进行替换,避免与现有的临时表/中间表冲突
val replacedTable = "tdl_" + databaseName + "_" + tableName
//加入tidb表数据源
tiContext.tidbMapTable(databaseName, tableName)
//注册为临时表
tiContext.getDataFrame(databaseName, tableName).createOrReplaceTempView(replacedTable)
//将sql语句中的表名进行替换
sql = StringUtils.replace(sql,www.dasheng178.com fullTableName, replacedTable)
} else {
throw new IllegalAccessError(fullTableName + "没有访问权限")
}
}
case _ =>
}
pjp.proceed(Array(sql)www.thd540.com).asInstanceOf[Dataset[Row]]

配置

pom.xml
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>

<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.1</version>
</dependency>

<!--公司内部版本,用于支持spark2.3-->

<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-core</artifactId>
<version>1.1-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
resources/META-INF/AspectSql.aj
<?xml version="1.0" encoding=www.taohuayuan178.com/"UTF-8" ?>
<aspectj>
<aspects>
<aspect name="cn.tongdun.aspectj.SparkSqlAspect"/>
</aspects>
<weaver options="-Xset:weaveJavaxPackages=true"/>
</aspectj>
spark-defaults.conf
spark.driver.extraClassPath /path/to/spark-aspectj.jar
spark.driver.extraJavaOptions -javaagent:/home/admin/aspectjweaver-1.9.1.jar
结语
通过上述的操作,在用户调用 spark.sql(...) 时将会触发相应的方法。hdfs/rdd/sparkSession/etc. 操作同理可实现。

不同公司面临的真实场景各有不同,因此并没有过多的实现细节,仅给需要的同学提供一些思路

转载于:https://www.cnblogs.com/qwangxiao/p/9642853.html

你可能感兴趣的文章
Android全屏
查看>>
HTML 标签。
查看>>
[bzoj2783][JLOI2012]树_树的遍历
查看>>
2018.10.20 bzoj1068: [SCOI2007]压缩(区间dp)
查看>>
Perl的IO操作(2):更多文件句柄模式
查看>>
由拖库攻击谈口令字段的加密策略
查看>>
Alpha 冲刺 (4/10)
查看>>
并发编程之线程池进程池
查看>>
初始化 Flask 虚拟环境 命令
查看>>
脚本简介jQuery微信开放平台注册表单
查看>>
将PHP数组输出为HTML表格
查看>>
Java中的线程Thread方法之---suspend()和resume() 分类: ...
查看>>
经典排序算法回顾:选择排序,快速排序
查看>>
BZOJ2213 [Poi2011]Difference 【乱搞】
查看>>
c# 对加密的MP4文件进行解密
查看>>
Flask 四种响应类型
查看>>
AOP面向切面编程C#实例
查看>>
怎么让win7右下角只显示时间不显示日期 ?(可行)
查看>>
AngularJs学习笔记-慕课网AngularJS实战
查看>>
数据库三大范式
查看>>