# 【风控算法】二、SQL->Python->PySpark计算KS，AUC及PSI

KS，AUC 和 PSI 是风控算法中最常计算的几个指标，本文记录了多种工具计算这些指标的方法。

```复制```import pandas as pd
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql import SparkSession, functions
from sklearn.metrics import roc_auc_score,roc_curve

tmptable = pd.DataFrame({'y':[np.random.randint(2) for i in range(1000000)]})
tmptable['y'] = tmptable['score'].apply(lambda x:1 if np.random.rand()+x>0.8 else 0)
tmp_sparkdf = spark.createDataFrame(tmptable)
tmp_sparkdf.craeteOrReplaceTempView('tmpview')
``````

# 一、KS

​KS 指标来源于 Kolmogorov-Smirnov 检验，通常用于比较两组样本是否来源于同一分布。在建模中划分训练集与测试集后，通常运用 KS 检验来检验训练集与测试集的分布差异，如果分布差异过大，那可能就会因为训练集、测试集划分不合理而降低模型的泛化性。（关于 KS 检验的更多细节

1. 按模型分从小到大排序，并分为 n 组（等频分组或每个不同的分值作为一组）
2. 计算截至每一组的累积好样本(y=0)占比与累积坏样本(y=1)占比，记为 𝑐𝑢𝑚𝑔𝑜𝑜𝑑𝑟𝑎𝑡𝑖𝑜𝑖$cumgoodrati{o}_{i}$𝑐𝑢𝑚𝑏𝑎𝑑𝑟𝑎𝑡𝑖𝑜𝑖$cumbadrati{o}_{i}$
如第 k 组：
累积好样本占比=第 k 组前包括第 k 组 y=0 样本数量 / 全部 y=0 样本的数量
累积坏样本占比=第 k 组前包括第 k 组 y=1 样本数量 / 全部 y=1 样本的数量
3. 𝐾𝑆=𝑚𝑎𝑥(𝑎𝑏𝑠(𝑐𝑢𝑚𝑔𝑜𝑜𝑑𝑟𝑎𝑡𝑖𝑜𝑖𝑐𝑢𝑚𝑏𝑎𝑑𝑟𝑎𝑡𝑖𝑜𝑖))$KS=max\left(abs\left(cumgoodrati{o}_{i}-cumbadrati{o}_{i}\right)\right)$

## 1. SQL 计算 KS

```复制```select max(abs(cumgood/totalgood-cumbad/totalbad)) as ks
from (
select score,
sum(totalgood)over(order by score) as cumgood,
sum(totalgood) over() as totalgood
from (
select
score,
sum(1-y) as totalgood
from tmpview
group by score
)
)
``````

## 2. Python 计算 KS

```复制```def get_ks(y_true:pd.Series,y_pred:pd.Series):
'''
A staticmethod to caculate the KS of the model.
Args:
y_true: true value of the sample
y_pred: pred value of the sample

Returns:
max(tpr-fpr): KS of the model
'''
fpr,tpr,_ = roc_curve(y_true,y_pred)
return str(max(abs(tpr-fpr)))
ksdata = spark.sql('select * from tmpview').toPandas()
print(get_ks(ksdata['y'],ksdata['score']))
``````

## 3. Pyspark 计算 KS

a. SQL 逻辑改写

```复制```ksdata = spark.sql('select * from tmpview')

def calks(df,ycol='y',scorecol='score'):
return df.withColumn(ycol,F.col(ycol).cast('int')).withColumn(scorecol,F.col(scorecol).cast('float'))\
.withColumn('totalgood',F.sum(1-F.col(ycol)).over(Window.orderBy(F.lit(1))))\
.withColumn('cumgood',F.sum(1-F.col(ycol)).over(Window.orderBy(F.col(scorecol).asc())))\
calks(ksdata).show()
``````

b. python 转 UDF 函数

```复制```def get_ks(y_true:pd.Series,y_pred:pd.Series):
'''
A staticmethod to caculate the KS of the model.
Args:
y_true: true value of the sample
y_pred: pred value of the sample

Returns:
max(tpr-fpr): KS of the model
'''
fpr,tpr,_ = roc_curve(y_true,y_pred)
return str(max(abs(tpr-fpr)))
get_ks_udfs = F.udf(get_ks, returnType=StringType())
ksdata = spark.sql('select * from tmpview')
print(ksdata.withColumn('eval metrics',F.lit('KS'))\
.groupby('eval metrics')\
.agg(get_ks_udfs(F.collect_list(F.col('y')),F.collect_list(F.col('score'))).alias('KS'))\
.select('KS').toPandas())
``````

# 二、AUC

AUC（Area Under Curve）被定义为 ROC 曲线下与坐标轴围成的面积，通常用来衡量二分类模型全局的区分能力。在 python 和 pyspark 中可以直接调包计算，在 SQL 中可以根据公式计算获得，其计算方法如下：

1. 对 score 从小到大排序

2. 根据公式计算：

𝐴𝑈𝐶=𝑖𝑝𝑜𝑠𝑖𝑡𝑖𝑣𝑒𝐶𝑙𝑎𝑠𝑠𝑟𝑎𝑛𝑘𝑖𝑀(1+𝑀)2𝑀×𝑁$AUC=\frac{\sum _{i\in positiveClass}ran{k}_{i}-\frac{M\left(1+M\right)}{2}}{M×N}$

其中，𝑟𝑎𝑛𝑘𝑖$ran{k}_{i}$ 代表第 i 个正样本的排序序号，M 和 N 分别代表正样本和负样本的总个数。

## 1. SQL 计算 AUC

```复制```select (sumpositivernk-totalbad*(1+totalbad)/2)/(totalbad*totalgood) as auc
from
(
select sum(if(y=1,rnk,0)) as sumpositivernk,
sum(1-y) as totalgood
from
(
select y,row_number() over (order by score) as rnk
from tmpview
)
)
``````

## 2. Python 计算 AUC

```复制```ksdata = spark.sql('select * from tmpview').toPandas()
print(roc_auc_score(ksdata['y'],ksdata['score']))
``````

## 3. Pyspark 计算 AUC

a. SQL 逻辑改写

```复制```aucdata = spark.sql('select * from tmpview')

def calauc(df,ycol='y',scorecol='score'):
return df.withColumn(ycol,F.col(ycol).cast('int')).withColumn(scorecol,F.col(scorecol).cast('float'))\
.withColumn('totalgood',F.sum(1-F.col(ycol)).over(Window.orderBy(F.lit(1))))\
.withColumn('rnk2',F.row_number().over(Window.orderBy(F.col(scorecol).asc())))\
.filter(F.col(ycol)==1)\

calauc(aucdata).show()
``````

b. UDF 函数

```复制```def auc(ytrue,ypred):
return str(roc_auc_score(ytrue,ypred))
get_auc_udfs = F.udf(auc, returnType=StringType())
aucdata = spark.sql('select * from tmpview')
aucdata.withColumn('eval metrics',F.lit('AUC'))\
.groupby('eval metrics')\
.agg(get_auc_udfs(F.collect_list(F.col('y')),F.collect_list(F.col('score'))).alias('AUC'))\
.select('AUC').show()
``````

c. 调包

```复制```from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='score',labelCol='y')
aucdata = spark.sql('select * from tmpview')
evaluator.evaluate(aucdata)
``````

# 三、PSI

PSI（Population Stability Index：群体稳定性指标），通常被用于衡量两个样本模型分分布的差异，在风控建模中通常有两个作用：

1. 用于建模时筛选掉不稳定的特征
2. 用于建模后及上线后评估和监控模型分值的稳定程度

## 1. SQL 计算 PSI

```复制```select
sum(grouppsi) as psi
from (
select g
,log(count(1) / sum(count(1))over() / 0.1)*(count(1) / sum(count(1))over() - 0.1) as grouppsi
from (
select
case when score<cutpoint[1] then 1
when score<cutpoint[2] then 2
when score<cutpoint[3] then 3
when score<cutpoint[4] then 4
when score<cutpoint[5] then 5
when score<cutpoint[6] then 6
when score<cutpoint[7] then 7
when score<cutpoint[8] then 8
when score<cutpoint[9] then 9
when score<cutpoint[10] then 10 else 'error' end as g
from (
select *
,array(0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1) as cutpoint
from tmpview
)
)
group by g
)
``````

## 2. Python 计算 PSI

```复制```psidata = spark.sql('select * from tmpview').toPandas()
psidata['g'] = pd.cut(psidata['score'],cut_point)
psitable = psidata.groupby('g')['y'].count()
psitable /= psitable.sum()
standratio = 1/(len(cut_point)-1)
psi = sum((psitable-standratio)*np.log(psitable/standratio))
``````

## 3. Pyspark 计算 PSI

```复制```from pyspark.ml.feature import Bucketizer

def psi(df, splits, inputCol, outputCol):
if len(splits) < 2:
raise RuntimeError("splits's length must grater then 2.")

standratio = 1 / (len(splits)-1)
bucketizer = Bucketizer(
splits=splits, inputCol=inputCol, outputCol='split')
with_split = bucketizer.transform(df)
with_split = with_split.groupby('split')\
.agg((F.count(F.col(inputCol))/F.sum(F.count(F.col(inputCol))).over(Window.orderBy(F.lit(1)))).alias('groupratio'))\
.select(F.sum((F.col('groupratio')-standratio)*F.log(F.col('groupratio')/standratio)).alias('PSI'))

return with_split
psi(aucdata,cut_point,'score','group').show()
``````

# 参考资料

SQL 计算多模型分的 PSI

Pyspark 实现连续分桶映射并自定义标签