Azure Synapse无服务器SQL池 - 如何优化笔记本
是否有更好的方法来优化以下笔记本?目前,运行需要2分20秒。如何提高性能?任何建议将不胜感激。谢谢。
环境:
- 中型火花池(8个VCORES/64 GB),带有3-30个节点和10位执行人
- ADLSG2 Premium(固态驱动器)
设置环境变量
environment = "mydatalake"
fileFormat = "parquet"
函数 - 设置在哪里加载源parquet文件的路径
tblName = ""
fldrName = ""
dbName = ""
filePrefix = ""
# Create the function
def fn_PathSource(fldrName,dbName,tblName,fileFormat,filePrefix):
str_path0 = "spark.read.load("
str_path1 = "'abfss://"
str_path2 = ".dfs.core.windows.net/sources"
str_path3 = ", format="
return f"{str_path0}{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/{dbName}{filePrefix}{tblName}.{fileFormat}'{str_path3}'{fileFormat}')"
- 设置将表数据存储在DataLake函数中的路径
# Create the variables used by the function
tblName = ""
fldrName = ""
dbName = ""
# Create the function
def fn_Path(fldrName,dbName,tblName):
str_path1 = "abfss://"
str_path2 = ".dfs.core.windows.net"
return f"{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/"
的记录加载数据帧的最新版本
import hashlib
from pyspark.sql.functions import md5, concat_ws,col
# Create the variables used by the function
uniqueId = ""
versionId = ""
tblName = ""
# Create the function
def fn_ReadLatestVrsn(uniqueId,versionId,tblName):
df_Max = spark.sql(f"SELECT {uniqueId},MAX({versionId}) AS {versionId}Max FROM {tblName} GROUP BY {uniqueId}")
df_Max.createOrReplaceTempView(f"{tblName}Max")
df_Latest = spark.sql(f"SELECT {uniqueId},{versionId}Max FROM {tblName}Max")
df_Latest = df_Latest.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}Max").cast("string"))))
df_Latest.createOrReplaceTempView(f"{tblName}Latest")
df_Hash = spark.sql(f"SELECT * FROM {tblName} t1")
df_Hash = df_Hash.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}").cast("string"))))
df_Hash.createOrReplaceTempView(f"{tblName}Hash")
df_Final = spark.sql(f"SELECT DISTINCT t1.* FROM {tblName}Hash t1 INNER JOIN {tblName}Latest t2 ON t1.HashKey = t2.HashKey")
df_Final.createOrReplaceTempView(f"{tblName}")
return spark.sql(f"SELECT * FROM {tblName}")
- 获取带有源表数据
DF_tblBitSize = eval(fn_PathSource("silver","MineDB","tblBitSize","parquet","_dbo_"))
DF_tblDailyReport = eval(fn_PathSource("silver","MineDB","tblDailyReport","parquet","_dbo_"))
DF_tblDailyReportHole = eval(fn_PathSource("silver","MineDB","tblDailyReportHole","parquet","_dbo_"))
DF_tblDailyReportHoleActivity = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivity","parquet","_dbo_"))
DF_tblDailyReportHoleActivityHours = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivityHours","parquet","_dbo_"))
DF_tblDailyReportShift = eval(fn_PathSource("silver","MineDB","tblDailyReportShift","parquet","_dbo_"))
DF_tblDrill = eval(fn_PathSource("silver","MineDB","tblDrill","parquet","_dbo_"))
DF_tblDrillType = eval(fn_PathSource("silver","MineDB","tblDrillType","parquet","_dbo_"))
DF_tblEmployee = eval(fn_PathSource("silver","MineDB","tblEmployee","parquet","_dbo_"))
DF_tblHole = eval(fn_PathSource("silver","MineDB","tblHole","parquet","_dbo_"))
DF_tblMineProject = eval(fn_PathSource("silver","MineDB","tblMineProject","parquet","_dbo_"))
DF_tblShift = eval(fn_PathSource("silver","MineDB","tblShift","parquet","_dbo_"))
DF_tblUnit = eval(fn_PathSource("silver","MineDB","tblUnit","parquet","_dbo_"))
DF_tblUnitType = eval(fn_PathSource("silver","MineDB","tblUnitType","parquet","_dbo_"))
DF_tblWorkSubCategory = eval(fn_PathSource("silver","MineDB","tblWorkSubCategory","parquet","_dbo_"))
DF_tblWorkSubCategoryType = eval(fn_PathSource("silver","MineDB","tblWorkSubCategoryType","parquet","_dbo_"))
DF_v_Dashboards_CompanyContracts= eval(fn_PathSource("silver","MineDB","v_Dashboards_CompanyContracts","parquet","_"))
DF_v_DailyReportShiftDrillers = eval(fn_PathSource("silver","MineDB","v_DailyReportShiftDrillers","parquet","_"))
DF_v_ActivityCharges = eval(fn_PathSource("silver","MineDB","v_ActivityCharges","parquet","_"))
将数据框架转换为临时视图,这些视图可以在SQL
DF_tblBitSize.createOrReplaceTempView("tblBitSize")
DF_tblDailyReport.createOrReplaceTempView("tblDailyReport")
DF_tblDailyReportHole.createOrReplaceTempView("tblDailyReportHole")
DF_tblDailyReportHoleActivity.createOrReplaceTempView("tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours.createOrReplaceTempView("tblDailyReportHoleActivityHours")
DF_tblDailyReportShift.createOrReplaceTempView("tblDailyReportShift")
DF_tblDrill.createOrReplaceTempView("tblDrill")
DF_tblDrillType.createOrReplaceTempView("tblDrillType")
DF_tblEmployee.createOrReplaceTempView("tblEmployee")
DF_tblHole.createOrReplaceTempView("tblHole")
DF_tblMineProject.createOrReplaceTempView("tblMineProject")
DF_tblShift.createOrReplaceTempView("tblShift")
DF_tblUnit.createOrReplaceTempView("tblUnit")
DF_tblUnitType.createOrReplaceTempView("tblUnitType")
DF_tblWorkSubCategory.createOrReplaceTempView("tblWorkSubCategory")
DF_tblWorkSubCategoryType.createOrReplaceTempView("tblWorkSubCategoryType") DF_v_Dashboards_CompanyContracts.createOrReplaceTempView("v_Dashboards_CompanyContracts")
DF_v_DailyReportShiftDrillers.createOrReplaceTempView("v_DailyReportShiftDrillers")
DF_v_ActivityCharges.createOrReplaceTempView("v_ActivityCharges")
加载中使用最新数据中的最新数据到
现有记录更新(更新现有记录)(或在源系统表中发生软删除),Azure数据工厂通过创建增量木木材料文件捕获会捕获的变化。创建新记录时也会发生同样的情况。在合并过程中,所有增量文件都合并为一个镶木式文件。对于已更新的现有记录(或发生了软删除),合并创建了该记录的两个版本,并附加了最新版本。如果要查询合并的木木木文件,您会看到一个重复的记录。 因此,要仅查看该记录的最新版本,我们需要删除先验版本。此功能将确保我们正在查看所有记录的最新版本。
** Special note: this logic is not necessary for tables with records that do not get soft deleted (eg tables without a LastModDateTime or ActiveInd column), therefore, we do not apply this function to those tables
DF_tblBitSize = fn_ReadLatestVrsn("BitSizeID","LastModDateTime","tblBitSize")
DF_tblDailyReport = fn_ReadLatestVrsn("DailyReportID","LastModDateTime","tblDailyReport")
DF_tblDailyReportHole = fn_ReadLatestVrsn("DailyReportHoleID","LastModDateTime","tblDailyReportHole")
DF_tblDailyReportHoleActivity = fn_ReadLatestVrsn("DailyReportHoleActivityID","LastModDateTime","tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours = fn_ReadLatestVrsn("DailyReportHoleActivityHoursID","LastModDateTime","tblDailyReportHoleActivityHours")
DF_tblDailyReportShift = fn_ReadLatestVrsn("DailyReportShiftID","LastModDateTime","tblDailyReportShift")
DF_tblDrill = fn_ReadLatestVrsn("DrillID","LastModDateTime","tblDrill")
DF_tblEmployee = fn_ReadLatestVrsn("EmployeeID","LastModDateTime","tblEmployee")
DF_tblHole = fn_ReadLatestVrsn("HoleID","LastModDateTime","tblHole")
DF_tblMineProject = fn_ReadLatestVrsn("MineProjectID","LastModDateTime","tblMineProject")
DF_tblShift = fn_ReadLatestVrsn("ShiftID","LastModDateTime","tblShift")
DF_tblWorkSubCategoryType = fn_ReadLatestVrsn("WorkSubCategoryTypeID","LastModDateTime","tblWorkSubCategoryType")
CTE_UnitConversion
%%sql
CREATE OR REPLACE TEMP VIEW CTE_UnitConversion AS
(
SELECT
u.UnitID
,ut.UnitType
,u.UnitName
,u.UnitAbbr
,COALESCE(CAST(u.Conversion AS FLOAT),1) AS Conversion
FROM
tblUnit u
INNER JOIN tblUnitType ut
ON u.UnitTypeID = ut.UnitTypeID
AND ut.UnitType IN ('Distance','Depth')
UNION
SELECT
-1 AS UnitID
,'Unknown' AS UnitType
,'Unknown' AS UnitName
,'Unknown' AS UnitAbbr
,1 AS Conversion
)
CTE_Dashboards_BaseData
%%sql
CREATE OR REPLACE TEMP VIEW CTE_Dashboards_BaseData AS
(
SELECT
CC.ContractID,
CC.ProjectID,
CAST(DR.ReportDate AS DATE) AS ReportDate,
D.DrillID,
CAST(D.DrillName AS STRING) AS DrillName,
DT.DrillTypeID,
CAST(DT.DrillType AS STRING) AS DrillType,
CAST(NULL AS STRING) AS HoleName,
CAST(S.ShiftName AS STRING) AS ShiftName,
STRING(CONCAT(E.LastName,' ',E.FirstName)) AS Supervisor,
CAST(DRSD.Drillers AS STRING) AS Driller,
CAST(NULL AS FLOAT) AS TotalMeterage,
CAST(NULL AS FLOAT) AS Depth,
CAST(NULL AS STRING) AS DepthUnit,
CAST(NULL AS FLOAT) AS ManHours,
CAST(NULL AS FLOAT) AS Payrollhours,
CAST(NULL AS FLOAT) AS ActivityHours,
CAST(NULL AS FLOAT) AS EquipmentHours,
CAST(NULL AS FLOAT) AS Quantity,
CAST(NULL AS STRING) AS Category,
CAST(NULL AS STRING) AS SubCategory,
CAST(NULL AS STRING) AS HoursType,
CAST(NULL AS STRING) AS BitSize,
CAST(DRS.DailyReportShiftID AS BIGINT) AS DailyReportShiftID,
CAST(DRS.ShiftID AS INT) AS ShiftID,
CAST(NULL AS TIMESTAMP) AS CompleteDateTime,
CAST(NULL AS STRING) AS HoleCompletionStatus,
CAST(NULL AS STRING) AS Notes,
CAST(NULL AS INT) AS HoleID,
CAST(NULL AS FLOAT) AS DistanceFrom,
CAST(NULL AS FLOAT) AS DistanceTo,
CAST(NULL AS STRING) AS DistanceFromToUnit,
CAST(NULL AS FLOAT) AS Distance,
CAST(NULL AS STRING) AS DistanceUnit,
CAST(NULL AS STRING) AS FluidUnit,
CAST(NULL AS FLOAT) AS FluidVolume,
CAST(NULL AS STRING) AS UID,
CAST(NULL AS FLOAT) AS MaxDepth,
CAST(NULL AS FLOAT) AS Penetration,
CAST(NULL AS FLOAT) AS Charges,
CAST(DR.Status AS STRING) AS Status,
CAST(DRS.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
v_Dashboards_CompanyContracts CC
LEFT JOIN tblDailyReport DR ON CC.ContractID = DR.ContractID AND CC.ProjectID = DR.ProjectID
LEFT JOIN tblDailyReportShift DRS ON DR.DailyReportID = DRS.DailyReportID
LEFT JOIN tblShift S ON DRS.ShiftID = S.ShiftID
LEFT JOIN tblDrill D ON DR.DrillID = D.DrillID
LEFT JOIN tblDrillType DT ON D.DrillTypeID = DT.DrillTypeID
LEFT JOIN tblEmployee E ON DRS.SupervisorID = E.EmployeeID
LEFT JOIN v_DailyReportShiftDrillers DRSD ON DRS.DailyReportShiftID = DRSD.DailyReportShiftID
WHERE
DR.Status <> 'Deleted'
)
CTE_DailyReportHoleActivityManHours
%%sql
CREATE OR REPLACE TEMP VIEW CTE_DailyReportHoleActivityManHours AS
(
SELECT
DailyReportHoleActivityID
,SUM(HoursAsFloat) AS ManHours
FROM
tblDailyReportHoleActivityHours
WHERE
ActiveInd = 'Y'
GROUP BY
DailyReportHoleActivityID
)
Activity charges
%%sql
CREATE OR REPLACE TEMP VIEW SECTION_1 AS
(
SELECT
BD.ContractID
,BD.ProjectID
,CAST(ReportDate AS DATE) AS ReportDate
,DrillID
,DRHA.Depth
,DPU.UnitAbbr AS DepthUnit
,DPU.UnitID AS DepthUnitID
,DRHAMH.ManHours
,DRHA.ActivityHoursAsFloat AS ActivityHours
,WSC.WorkSubCategoryName AS Category
,WSCT.TypeName AS SubCategory
,CASE
WHEN (COALESCE(AC.Charges,0) = 0 AND COALESCE(AC.BillableCount, 0) = 0) OR DRHA.Billable='N' THEN 'Non-Billable'
WHEN AC.DefinedRateName IS NOT NULL AND DRHA.Billable <> 'N' THEN AC.DefinedRateName
ELSE WSC.WorkSubCategoryName
END AS HoursType
,BS.BitSizeID AS BitSizeID
,BS.BitSize
,DRHA.BitID AS BitID
,BD.DailyReportShiftID
,DRHA.Notes
,H.HoleID
,DRHA.DistanceFrom
,DRHA.DistanceTo
,DFU.UnitAbbr AS DistanceFromToUnit
,DFU.UnitID AS DistanceFromToUnitID
,DRHA.Distance
,DU.UnitID AS DistanceUnitID
,CASE
WHEN WSC.WorkCategoryId = 1 THEN MAX(COALESCE(DRHA.DistanceTo, 0)) OVER ( PARTITION BY H.HoleID, WSC.WorkSubCategoryName ORDER BY H.HoleID, ReportDate, BD.ShiftID, DRHA.SequenceNumber, DRHA.CreateDateTime, DRHA.DistanceTo)
ELSE NULL
END AS MaxDepth
,CASE
WHEN WSC.WorkCategoryId = 1 THEN DRHA.Penetration
ELSE 0
END AS Penetration
,COALESCE(AC.Charges,0) AS Charges
,BD.Status
,H.MineProjectID
,CAST(DRHA.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
CTE_Dashboards_BaseData BD
INNER JOIN tblDailyReportHole DRH ON BD.DailyReportShiftID = DRH.DailyReportShiftID
INNER JOIN tblDailyReportHoleActivity DRHA ON DRH.DailyReportHoleID = DRHA.DailyReportHoleID
INNER JOIN tblWorkSubCategory WSC ON DRHA.WorkSubCategoryID = WSC.WorkSubCategoryID
LEFT JOIN tblHole H ON DRH.HoleID = H.HoleID
LEFT JOIN tblBitSize BS ON DRHA.BitSizeID = BS.BitSizeID
LEFT JOIN tblUnit DPU ON DRHA.DepthUnitID = DPU.UnitID
LEFT JOIN tblUnit DFU ON DRHA.DistanceFromToUnitID = DFU.UnitID
LEFT JOIN tblUnit DU ON DRHA.DistanceUnitID = DU.UnitID
LEFT JOIN tblWorkSubCategoryType WSCT ON DRHA.TypeID = WSCT.WorkSubCategoryTypeID
LEFT JOIN v_ActivityCharges AC ON DRHA.DailyReportHoleActivityID = AC.DailyReportHoleActivityID
LEFT JOIN CTE_DailyReportHoleActivityManHours DRHAMH ON DRHA.DailyReportHoleActivityID = DRHAMH.DailyReportHoleActivityID
WHERE
DRH.ActiveInd = 'Y'
AND DRHA.ActiveInd = 'Y'
)
Create FACT_Activity table
df = spark.sql("""
SELECT
ReportDate
,DrillingCompanyID
,MiningCompanyID
,DrillID
,ProjectID
,ContractID
,LocationID
,HoleID
,DailyReportShiftId
,MineProjectID
,BitID
,TRIM(UPPER(BitSize)) AS BitSize
,-1 AS TimesheetId
,CurrencyID
,TRIM(UPPER(Category)) AS Category
,TRIM(UPPER(SubCategory)) AS SubCategory
,TRIM(UPPER(HoursType)) AS HoursType
,TRIM(UPPER(Notes)) AS Notes
,ApprovalStatus
,Depth AS Depth
,(Depth/COALESCE(Depth.Conversion,1)) AS DepthMeters
,Manhours
,ActivityHours
,DistanceFrom
,DistanceTo
,Distance
,Penetration
,(DistanceFrom/Distance.Conversion) AS DistanceFromMeters
,(DistanceTo/Distance.Conversion) AS DistanceToMeters
,(Distance/Distance.Conversion) AS DistanceMeters
,(Penetration/Distance.Conversion) AS PenetrationMeters
,DepthUnitID
,DistanceFromToUnitID
,Charges
,LastModDateTime
,ReportApprovalRequired
FROM
(
SELECT
COALESCE(CAST(ReportDate AS DATE),'01/01/1900') AS ReportDate
,COALESCE(DrillingCompanyID,-1) AS DrillingCompanyID
,COALESCE(MiningCompanyID,-1) AS MiningCompanyID
,COALESCE(DrillID,-1) AS DrillID
,COALESCE(C.ProjectID, -1) AS ProjectID
,COALESCE(C.ContractID,-1) AS ContractID
,COALESCE(C.LocationID,-1) AS LocationID
,COALESCE(HoleID,-1) AS HoleID
,COALESCE(DailyReportShiftID,-1) AS DailyReportShiftId
,COALESCE(MP.MineProjectID,-1) AS MineProjectID
,COALESCE(BitID,-1) AS BitID
,COALESCE(BitSize,'UNKNOWN') AS BitSize
,COALESCE(DepthUnitID,-1) AS DepthUnitID
,COALESCE(DistanceFromToUnitID,-1) AS DistanceFromToUnitID
,COALESCE(DistanceUnitID,-1) AS DistanceUnitID
,COALESCE(C.CurrencyID,-1) AS CurrencyID
,COALESCE(Category,'Unknown') AS Category
,COALESCE(SubCategory,'UNKNOWN') AS SubCategory
,COALESCE(HoursType,'UNKNOWN') AS HoursType
,SUBSTRING(Notes,0,250) AS Notes
,COALESCE(U.Status,'Unknown') AS ApprovalStatus
,COALESCE(Depth,0) AS Depth
,COALESCE(Manhours,0) AS Manhours
,COALESCE(ActivityHours,0) AS ActivityHours
,COALESCE(DistanceFrom,0) AS DistanceFrom
,COALESCE(DistanceTo,0) AS DistanceTo
,COALESCE(Distance,0) AS Distance
,COALESCE(Penetration,0) AS Penetration
,COALESCE(Charges,0) AS Charges
,COALESCE(CAST(U.LastModDateTime AS TIMESTAMP),'1900/01/01 00:00:00') AS LastModDateTime
,C.ReportApprovalRequired
FROM
SECTION_1 U
LEFT JOIN v_Dashboards_CompanyContracts C ON U.ContractID = C.ContractID AND COALESCE(U.ProjectID,-1) = C.ProjectID
LEFT JOIN tblMineProject MP ON U.MineProjectID = MP.MineProjectID AND MP.ActiveInd = 'Y'
) TBL1
INNER JOIN CTE_UnitConversion Distance ON tbl1.DistanceFromToUnitID = Distance.UnitID
INNER JOIN CTE_UnitConversion Depth ON tbl1.DepthUnitID = Depth.UnitID
""")
创建表并写入
tblName = "fact_activity"
fldrName = "myfolder"
dbName = "mydatabase"
path = fn_Path(fldrName,dbName,tblName)
path
# Reduce the number of parquet files written using coalesce and write the dataframe to the datalake
df.coalesce(1).write.format("parquet").mode("overwrite").save(path)
# Drop the table (only dropping the metadata) if it exists in the lakehouse database
spark.sql(f"DROP TABLE IF EXISTS {dbName}.{tblName}")
# Now create the table (metadata only) and point it at the data in the datalake
spark.sql(f"CREATE TABLE {dbName}.{tblName} USING PARQUET LOCATION '{path}'")
Releast
%%sql
DROP VIEW SECTION_1;
DROP VIEW CTE_DailyReportHoleActivityManHours;
DROP VIEW CTE_Dashboards_BaseData;
DROP VIEW CTE_UnitConversion;
DROP VIEW tblBitSize;
DROP VIEW tblDailyReport;
DROP VIEW tblDailyReportHole;
DROP VIEW tblDailyReportHoleActivity;
DROP VIEW tblDailyReportHoleActivityHours;
DROP VIEW tblDailyReportShift;
DROP VIEW tblDrill;
DROP VIEW tblEmployee;
DROP VIEW tblHole;
DROP VIEW tblMineProject;
DROP VIEW tblShift;
DataLake
del DF_tblBitSize
del DF_tblDailyReport
del DF_tblDailyReportHole
del DF_tblDailyReportHoleActivity
del DF_tblDailyReportHoleActivityHours
del DF_tblDailyReportShift
del DF_tblDrill
del DF_tblDrillType
del DF_tblEmployee
del DF_tblHole
del DF_tblMineProject
del DF_tblShift
del DF_tblUnit
del DF_tblUnitType
del DF_tblWorkSubCategory
del DF_v_Dashboards_CompanyContracts
del DF_v_DailyReportShiftDrillers
del DF_v_ActivityCharges
Is there a better way to optimize the following notebook? Currently it takes 2 minutes and 20 seconds to run. How can I improve performance? Any suggestions would be appreciated. Thanks.
Environment:
- medium sized spark pool (8 vCores/64 GB) with 3-30 nodes and 10 executors
- ADLSG2 premium (solid state drives)
Set the environment variables
environment = "mydatalake"
fileFormat = "parquet"
Function - set the path of where to load the source parquet files from
tblName = ""
fldrName = ""
dbName = ""
filePrefix = ""
# Create the function
def fn_PathSource(fldrName,dbName,tblName,fileFormat,filePrefix):
str_path0 = "spark.read.load("
str_path1 = "'abfss://"
str_path2 = ".dfs.core.windows.net/sources"
str_path3 = ", format="
return f"{str_path0}{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/{dbName}{filePrefix}{tblName}.{fileFormat}'{str_path3}'{fileFormat}')"
Function - set the path where the table data will be stored in the datalake
# Create the variables used by the function
tblName = ""
fldrName = ""
dbName = ""
# Create the function
def fn_Path(fldrName,dbName,tblName):
str_path1 = "abfss://"
str_path2 = ".dfs.core.windows.net"
return f"{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/"
Function - get the latest version of the records
import hashlib
from pyspark.sql.functions import md5, concat_ws,col
# Create the variables used by the function
uniqueId = ""
versionId = ""
tblName = ""
# Create the function
def fn_ReadLatestVrsn(uniqueId,versionId,tblName):
df_Max = spark.sql(f"SELECT {uniqueId},MAX({versionId}) AS {versionId}Max FROM {tblName} GROUP BY {uniqueId}")
df_Max.createOrReplaceTempView(f"{tblName}Max")
df_Latest = spark.sql(f"SELECT {uniqueId},{versionId}Max FROM {tblName}Max")
df_Latest = df_Latest.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}Max").cast("string"))))
df_Latest.createOrReplaceTempView(f"{tblName}Latest")
df_Hash = spark.sql(f"SELECT * FROM {tblName} t1")
df_Hash = df_Hash.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}").cast("string"))))
df_Hash.createOrReplaceTempView(f"{tblName}Hash")
df_Final = spark.sql(f"SELECT DISTINCT t1.* FROM {tblName}Hash t1 INNER JOIN {tblName}Latest t2 ON t1.HashKey = t2.HashKey")
df_Final.createOrReplaceTempView(f"{tblName}")
return spark.sql(f"SELECT * FROM {tblName}")
Load data frames with source table data
DF_tblBitSize = eval(fn_PathSource("silver","MineDB","tblBitSize","parquet","_dbo_"))
DF_tblDailyReport = eval(fn_PathSource("silver","MineDB","tblDailyReport","parquet","_dbo_"))
DF_tblDailyReportHole = eval(fn_PathSource("silver","MineDB","tblDailyReportHole","parquet","_dbo_"))
DF_tblDailyReportHoleActivity = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivity","parquet","_dbo_"))
DF_tblDailyReportHoleActivityHours = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivityHours","parquet","_dbo_"))
DF_tblDailyReportShift = eval(fn_PathSource("silver","MineDB","tblDailyReportShift","parquet","_dbo_"))
DF_tblDrill = eval(fn_PathSource("silver","MineDB","tblDrill","parquet","_dbo_"))
DF_tblDrillType = eval(fn_PathSource("silver","MineDB","tblDrillType","parquet","_dbo_"))
DF_tblEmployee = eval(fn_PathSource("silver","MineDB","tblEmployee","parquet","_dbo_"))
DF_tblHole = eval(fn_PathSource("silver","MineDB","tblHole","parquet","_dbo_"))
DF_tblMineProject = eval(fn_PathSource("silver","MineDB","tblMineProject","parquet","_dbo_"))
DF_tblShift = eval(fn_PathSource("silver","MineDB","tblShift","parquet","_dbo_"))
DF_tblUnit = eval(fn_PathSource("silver","MineDB","tblUnit","parquet","_dbo_"))
DF_tblUnitType = eval(fn_PathSource("silver","MineDB","tblUnitType","parquet","_dbo_"))
DF_tblWorkSubCategory = eval(fn_PathSource("silver","MineDB","tblWorkSubCategory","parquet","_dbo_"))
DF_tblWorkSubCategoryType = eval(fn_PathSource("silver","MineDB","tblWorkSubCategoryType","parquet","_dbo_"))
DF_v_Dashboards_CompanyContracts= eval(fn_PathSource("silver","MineDB","v_Dashboards_CompanyContracts","parquet","_"))
DF_v_DailyReportShiftDrillers = eval(fn_PathSource("silver","MineDB","v_DailyReportShiftDrillers","parquet","_"))
DF_v_ActivityCharges = eval(fn_PathSource("silver","MineDB","v_ActivityCharges","parquet","_"))
Convert dataframes to temporary views that can be used in SQL
DF_tblBitSize.createOrReplaceTempView("tblBitSize")
DF_tblDailyReport.createOrReplaceTempView("tblDailyReport")
DF_tblDailyReportHole.createOrReplaceTempView("tblDailyReportHole")
DF_tblDailyReportHoleActivity.createOrReplaceTempView("tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours.createOrReplaceTempView("tblDailyReportHoleActivityHours")
DF_tblDailyReportShift.createOrReplaceTempView("tblDailyReportShift")
DF_tblDrill.createOrReplaceTempView("tblDrill")
DF_tblDrillType.createOrReplaceTempView("tblDrillType")
DF_tblEmployee.createOrReplaceTempView("tblEmployee")
DF_tblHole.createOrReplaceTempView("tblHole")
DF_tblMineProject.createOrReplaceTempView("tblMineProject")
DF_tblShift.createOrReplaceTempView("tblShift")
DF_tblUnit.createOrReplaceTempView("tblUnit")
DF_tblUnitType.createOrReplaceTempView("tblUnitType")
DF_tblWorkSubCategory.createOrReplaceTempView("tblWorkSubCategory")
DF_tblWorkSubCategoryType.createOrReplaceTempView("tblWorkSubCategoryType") DF_v_Dashboards_CompanyContracts.createOrReplaceTempView("v_Dashboards_CompanyContracts")
DF_v_DailyReportShiftDrillers.createOrReplaceTempView("v_DailyReportShiftDrillers")
DF_v_ActivityCharges.createOrReplaceTempView("v_ActivityCharges")
Load latest data into views
When an existing record is updated (or a soft delete occurs) in the source system table, Azure Data Factory captures that change by creating an incremental parquet file. The same occurs when a new record is created. During the merge process, all of the incremental files are merged into one parquet file. For the existing record that was updated (or a soft deleted occured), the merge creates two versions of that record, appending the latest version. If you were to query the merged parquet file, you would see a duplicate record.
Therefore, to see only the latest version of that record, we need to remove the prior version. This function will ensure that we are looking at the most up to date version of all records.
** Special note: this logic is not necessary for tables with records that do not get soft deleted (e.g. tables without a LastModDateTime or ActiveInd column), therefore, we do not apply this function to those tables
DF_tblBitSize = fn_ReadLatestVrsn("BitSizeID","LastModDateTime","tblBitSize")
DF_tblDailyReport = fn_ReadLatestVrsn("DailyReportID","LastModDateTime","tblDailyReport")
DF_tblDailyReportHole = fn_ReadLatestVrsn("DailyReportHoleID","LastModDateTime","tblDailyReportHole")
DF_tblDailyReportHoleActivity = fn_ReadLatestVrsn("DailyReportHoleActivityID","LastModDateTime","tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours = fn_ReadLatestVrsn("DailyReportHoleActivityHoursID","LastModDateTime","tblDailyReportHoleActivityHours")
DF_tblDailyReportShift = fn_ReadLatestVrsn("DailyReportShiftID","LastModDateTime","tblDailyReportShift")
DF_tblDrill = fn_ReadLatestVrsn("DrillID","LastModDateTime","tblDrill")
DF_tblEmployee = fn_ReadLatestVrsn("EmployeeID","LastModDateTime","tblEmployee")
DF_tblHole = fn_ReadLatestVrsn("HoleID","LastModDateTime","tblHole")
DF_tblMineProject = fn_ReadLatestVrsn("MineProjectID","LastModDateTime","tblMineProject")
DF_tblShift = fn_ReadLatestVrsn("ShiftID","LastModDateTime","tblShift")
DF_tblWorkSubCategoryType = fn_ReadLatestVrsn("WorkSubCategoryTypeID","LastModDateTime","tblWorkSubCategoryType")
CTE_UnitConversion
%%sql
CREATE OR REPLACE TEMP VIEW CTE_UnitConversion AS
(
SELECT
u.UnitID
,ut.UnitType
,u.UnitName
,u.UnitAbbr
,COALESCE(CAST(u.Conversion AS FLOAT),1) AS Conversion
FROM
tblUnit u
INNER JOIN tblUnitType ut
ON u.UnitTypeID = ut.UnitTypeID
AND ut.UnitType IN ('Distance','Depth')
UNION
SELECT
-1 AS UnitID
,'Unknown' AS UnitType
,'Unknown' AS UnitName
,'Unknown' AS UnitAbbr
,1 AS Conversion
)
CTE_Dashboards_BaseData
%%sql
CREATE OR REPLACE TEMP VIEW CTE_Dashboards_BaseData AS
(
SELECT
CC.ContractID,
CC.ProjectID,
CAST(DR.ReportDate AS DATE) AS ReportDate,
D.DrillID,
CAST(D.DrillName AS STRING) AS DrillName,
DT.DrillTypeID,
CAST(DT.DrillType AS STRING) AS DrillType,
CAST(NULL AS STRING) AS HoleName,
CAST(S.ShiftName AS STRING) AS ShiftName,
STRING(CONCAT(E.LastName,' ',E.FirstName)) AS Supervisor,
CAST(DRSD.Drillers AS STRING) AS Driller,
CAST(NULL AS FLOAT) AS TotalMeterage,
CAST(NULL AS FLOAT) AS Depth,
CAST(NULL AS STRING) AS DepthUnit,
CAST(NULL AS FLOAT) AS ManHours,
CAST(NULL AS FLOAT) AS Payrollhours,
CAST(NULL AS FLOAT) AS ActivityHours,
CAST(NULL AS FLOAT) AS EquipmentHours,
CAST(NULL AS FLOAT) AS Quantity,
CAST(NULL AS STRING) AS Category,
CAST(NULL AS STRING) AS SubCategory,
CAST(NULL AS STRING) AS HoursType,
CAST(NULL AS STRING) AS BitSize,
CAST(DRS.DailyReportShiftID AS BIGINT) AS DailyReportShiftID,
CAST(DRS.ShiftID AS INT) AS ShiftID,
CAST(NULL AS TIMESTAMP) AS CompleteDateTime,
CAST(NULL AS STRING) AS HoleCompletionStatus,
CAST(NULL AS STRING) AS Notes,
CAST(NULL AS INT) AS HoleID,
CAST(NULL AS FLOAT) AS DistanceFrom,
CAST(NULL AS FLOAT) AS DistanceTo,
CAST(NULL AS STRING) AS DistanceFromToUnit,
CAST(NULL AS FLOAT) AS Distance,
CAST(NULL AS STRING) AS DistanceUnit,
CAST(NULL AS STRING) AS FluidUnit,
CAST(NULL AS FLOAT) AS FluidVolume,
CAST(NULL AS STRING) AS UID,
CAST(NULL AS FLOAT) AS MaxDepth,
CAST(NULL AS FLOAT) AS Penetration,
CAST(NULL AS FLOAT) AS Charges,
CAST(DR.Status AS STRING) AS Status,
CAST(DRS.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
v_Dashboards_CompanyContracts CC
LEFT JOIN tblDailyReport DR ON CC.ContractID = DR.ContractID AND CC.ProjectID = DR.ProjectID
LEFT JOIN tblDailyReportShift DRS ON DR.DailyReportID = DRS.DailyReportID
LEFT JOIN tblShift S ON DRS.ShiftID = S.ShiftID
LEFT JOIN tblDrill D ON DR.DrillID = D.DrillID
LEFT JOIN tblDrillType DT ON D.DrillTypeID = DT.DrillTypeID
LEFT JOIN tblEmployee E ON DRS.SupervisorID = E.EmployeeID
LEFT JOIN v_DailyReportShiftDrillers DRSD ON DRS.DailyReportShiftID = DRSD.DailyReportShiftID
WHERE
DR.Status <> 'Deleted'
)
CTE_DailyReportHoleActivityManHours
%%sql
CREATE OR REPLACE TEMP VIEW CTE_DailyReportHoleActivityManHours AS
(
SELECT
DailyReportHoleActivityID
,SUM(HoursAsFloat) AS ManHours
FROM
tblDailyReportHoleActivityHours
WHERE
ActiveInd = 'Y'
GROUP BY
DailyReportHoleActivityID
)
Activity charges
%%sql
CREATE OR REPLACE TEMP VIEW SECTION_1 AS
(
SELECT
BD.ContractID
,BD.ProjectID
,CAST(ReportDate AS DATE) AS ReportDate
,DrillID
,DRHA.Depth
,DPU.UnitAbbr AS DepthUnit
,DPU.UnitID AS DepthUnitID
,DRHAMH.ManHours
,DRHA.ActivityHoursAsFloat AS ActivityHours
,WSC.WorkSubCategoryName AS Category
,WSCT.TypeName AS SubCategory
,CASE
WHEN (COALESCE(AC.Charges,0) = 0 AND COALESCE(AC.BillableCount, 0) = 0) OR DRHA.Billable='N' THEN 'Non-Billable'
WHEN AC.DefinedRateName IS NOT NULL AND DRHA.Billable <> 'N' THEN AC.DefinedRateName
ELSE WSC.WorkSubCategoryName
END AS HoursType
,BS.BitSizeID AS BitSizeID
,BS.BitSize
,DRHA.BitID AS BitID
,BD.DailyReportShiftID
,DRHA.Notes
,H.HoleID
,DRHA.DistanceFrom
,DRHA.DistanceTo
,DFU.UnitAbbr AS DistanceFromToUnit
,DFU.UnitID AS DistanceFromToUnitID
,DRHA.Distance
,DU.UnitID AS DistanceUnitID
,CASE
WHEN WSC.WorkCategoryId = 1 THEN MAX(COALESCE(DRHA.DistanceTo, 0)) OVER ( PARTITION BY H.HoleID, WSC.WorkSubCategoryName ORDER BY H.HoleID, ReportDate, BD.ShiftID, DRHA.SequenceNumber, DRHA.CreateDateTime, DRHA.DistanceTo)
ELSE NULL
END AS MaxDepth
,CASE
WHEN WSC.WorkCategoryId = 1 THEN DRHA.Penetration
ELSE 0
END AS Penetration
,COALESCE(AC.Charges,0) AS Charges
,BD.Status
,H.MineProjectID
,CAST(DRHA.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
CTE_Dashboards_BaseData BD
INNER JOIN tblDailyReportHole DRH ON BD.DailyReportShiftID = DRH.DailyReportShiftID
INNER JOIN tblDailyReportHoleActivity DRHA ON DRH.DailyReportHoleID = DRHA.DailyReportHoleID
INNER JOIN tblWorkSubCategory WSC ON DRHA.WorkSubCategoryID = WSC.WorkSubCategoryID
LEFT JOIN tblHole H ON DRH.HoleID = H.HoleID
LEFT JOIN tblBitSize BS ON DRHA.BitSizeID = BS.BitSizeID
LEFT JOIN tblUnit DPU ON DRHA.DepthUnitID = DPU.UnitID
LEFT JOIN tblUnit DFU ON DRHA.DistanceFromToUnitID = DFU.UnitID
LEFT JOIN tblUnit DU ON DRHA.DistanceUnitID = DU.UnitID
LEFT JOIN tblWorkSubCategoryType WSCT ON DRHA.TypeID = WSCT.WorkSubCategoryTypeID
LEFT JOIN v_ActivityCharges AC ON DRHA.DailyReportHoleActivityID = AC.DailyReportHoleActivityID
LEFT JOIN CTE_DailyReportHoleActivityManHours DRHAMH ON DRHA.DailyReportHoleActivityID = DRHAMH.DailyReportHoleActivityID
WHERE
DRH.ActiveInd = 'Y'
AND DRHA.ActiveInd = 'Y'
)
Create FACT_Activity table
df = spark.sql("""
SELECT
ReportDate
,DrillingCompanyID
,MiningCompanyID
,DrillID
,ProjectID
,ContractID
,LocationID
,HoleID
,DailyReportShiftId
,MineProjectID
,BitID
,TRIM(UPPER(BitSize)) AS BitSize
,-1 AS TimesheetId
,CurrencyID
,TRIM(UPPER(Category)) AS Category
,TRIM(UPPER(SubCategory)) AS SubCategory
,TRIM(UPPER(HoursType)) AS HoursType
,TRIM(UPPER(Notes)) AS Notes
,ApprovalStatus
,Depth AS Depth
,(Depth/COALESCE(Depth.Conversion,1)) AS DepthMeters
,Manhours
,ActivityHours
,DistanceFrom
,DistanceTo
,Distance
,Penetration
,(DistanceFrom/Distance.Conversion) AS DistanceFromMeters
,(DistanceTo/Distance.Conversion) AS DistanceToMeters
,(Distance/Distance.Conversion) AS DistanceMeters
,(Penetration/Distance.Conversion) AS PenetrationMeters
,DepthUnitID
,DistanceFromToUnitID
,Charges
,LastModDateTime
,ReportApprovalRequired
FROM
(
SELECT
COALESCE(CAST(ReportDate AS DATE),'01/01/1900') AS ReportDate
,COALESCE(DrillingCompanyID,-1) AS DrillingCompanyID
,COALESCE(MiningCompanyID,-1) AS MiningCompanyID
,COALESCE(DrillID,-1) AS DrillID
,COALESCE(C.ProjectID, -1) AS ProjectID
,COALESCE(C.ContractID,-1) AS ContractID
,COALESCE(C.LocationID,-1) AS LocationID
,COALESCE(HoleID,-1) AS HoleID
,COALESCE(DailyReportShiftID,-1) AS DailyReportShiftId
,COALESCE(MP.MineProjectID,-1) AS MineProjectID
,COALESCE(BitID,-1) AS BitID
,COALESCE(BitSize,'UNKNOWN') AS BitSize
,COALESCE(DepthUnitID,-1) AS DepthUnitID
,COALESCE(DistanceFromToUnitID,-1) AS DistanceFromToUnitID
,COALESCE(DistanceUnitID,-1) AS DistanceUnitID
,COALESCE(C.CurrencyID,-1) AS CurrencyID
,COALESCE(Category,'Unknown') AS Category
,COALESCE(SubCategory,'UNKNOWN') AS SubCategory
,COALESCE(HoursType,'UNKNOWN') AS HoursType
,SUBSTRING(Notes,0,250) AS Notes
,COALESCE(U.Status,'Unknown') AS ApprovalStatus
,COALESCE(Depth,0) AS Depth
,COALESCE(Manhours,0) AS Manhours
,COALESCE(ActivityHours,0) AS ActivityHours
,COALESCE(DistanceFrom,0) AS DistanceFrom
,COALESCE(DistanceTo,0) AS DistanceTo
,COALESCE(Distance,0) AS Distance
,COALESCE(Penetration,0) AS Penetration
,COALESCE(Charges,0) AS Charges
,COALESCE(CAST(U.LastModDateTime AS TIMESTAMP),'1900/01/01 00:00:00') AS LastModDateTime
,C.ReportApprovalRequired
FROM
SECTION_1 U
LEFT JOIN v_Dashboards_CompanyContracts C ON U.ContractID = C.ContractID AND COALESCE(U.ProjectID,-1) = C.ProjectID
LEFT JOIN tblMineProject MP ON U.MineProjectID = MP.MineProjectID AND MP.ActiveInd = 'Y'
) TBL1
INNER JOIN CTE_UnitConversion Distance ON tbl1.DistanceFromToUnitID = Distance.UnitID
INNER JOIN CTE_UnitConversion Depth ON tbl1.DepthUnitID = Depth.UnitID
""")
Create the table and write to the datalake
tblName = "fact_activity"
fldrName = "myfolder"
dbName = "mydatabase"
path = fn_Path(fldrName,dbName,tblName)
path
# Reduce the number of parquet files written using coalesce and write the dataframe to the datalake
df.coalesce(1).write.format("parquet").mode("overwrite").save(path)
# Drop the table (only dropping the metadata) if it exists in the lakehouse database
spark.sql(f"DROP TABLE IF EXISTS {dbName}.{tblName}")
# Now create the table (metadata only) and point it at the data in the datalake
spark.sql(f"CREATE TABLE {dbName}.{tblName} USING PARQUET LOCATION '{path}'")
Release SQL views from memory
%%sql
DROP VIEW SECTION_1;
DROP VIEW CTE_DailyReportHoleActivityManHours;
DROP VIEW CTE_Dashboards_BaseData;
DROP VIEW CTE_UnitConversion;
DROP VIEW tblBitSize;
DROP VIEW tblDailyReport;
DROP VIEW tblDailyReportHole;
DROP VIEW tblDailyReportHoleActivity;
DROP VIEW tblDailyReportHoleActivityHours;
DROP VIEW tblDailyReportShift;
DROP VIEW tblDrill;
DROP VIEW tblEmployee;
DROP VIEW tblHole;
DROP VIEW tblMineProject;
DROP VIEW tblShift;
Release data frames from memory
del DF_tblBitSize
del DF_tblDailyReport
del DF_tblDailyReportHole
del DF_tblDailyReportHoleActivity
del DF_tblDailyReportHoleActivityHours
del DF_tblDailyReportShift
del DF_tblDrill
del DF_tblDrillType
del DF_tblEmployee
del DF_tblHole
del DF_tblMineProject
del DF_tblShift
del DF_tblUnit
del DF_tblUnitType
del DF_tblWorkSubCategory
del DF_v_Dashboards_CompanyContracts
del DF_v_DailyReportShiftDrillers
del DF_v_ActivityCharges
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
除外,从内存释放SQL视图和从内存中释放数据帧一切看起来都很好。
如果您的应用程序需要经常查询数据,并且需要创建
views
,则可以在专用的SQL池中创建外部表,并使用Synapse SQL保存表格的视图。这将更加有效,每次您需要数据时都不会删除视图并发布数据范围。您还可以创建并使用本机外部使用Azure Synapse Analytics中的SQL池的表使用与
type = Hadoop
在其外部数据源定义中的外部表相比,本机外部表的性能更好。这是因为本机外部表使用本机代码访问外部数据。您还可以参考最佳实践对于Azure Synapse Analytics中的无服务器SQL池获取有关性能优化的更多详细信息。
Apart from Release SQL views from memory and Release data frames from memory everything looks fine.
If your application required to query the data frequently and there is requirement to create
VIEWS
, you can create the EXTERNAL TABLE in Dedicated SQL Pool and save the VIEWS for the tables using Synapse SQL. This would be more efficient and there won't be need to drop VIEWS and release dataframes every time you need the data.You can also create and use native external tables using SQL pools in Azure Synapse Analytics as Native external tables have better performance when compared to external tables with
TYPE=HADOOP
in their external data source definition. This is because native external tables use native code to access external data.You can also refer Best practices for serverless SQL pool in Azure Synapse Analytics to get more details regarding performance optimization.