WORDS
- 用requires 代替need 表示 依靠,依赖,需要
- persisted 持久化
1 Synapse 的主要功能
Synapse 主要有几个功能:
- 集成分析,可以将gen2里的各类数据源整合在一起,如data lake, Nosql,使得用户可以从多个数据源中获取数据
- 数据仓库,提供了大规模数据存储和查询,采用了分布式架构
- 大数据分析:集成了apache spark引擎
- 实时数据处理:集成了kafka
-
高级的安全性和身份信息:数据加密,身份验证,访问控制
2 Sql data warehouse 的优点
Host a data warehouse via the azure synapse service。
sql database 和azure sql warehouse的区别:
- database适用于传统的web类似于 服务器上托管的sql2008,对于大数据查询性能不好,data warehouse则是分布式的,对大数据友好。
- 如果只是web的功能或者只是想进行表格的增删改查,sql database是个很好的选择
- 但是除了想增删改查,但是又想进行数据分析,这会给传统的database造成压力,所以data warehouse就是最好的选择
- 总而言之,只是进行简单的业务操作使用Sql, 但是既然想业务操作,又想数据分析则warehouse。
3 Create an Azure synapse
创建synapse需要已经部署好的azure data lake gen2 storage account的账号,我们可以使用现有的,也可以创建一个新的
- 在部署时候,我们有两个选项,一个是sql pool 用来build the data warehouse,这里存放的数据往往都是干净的,清洗过后且直接可以用来数据分析或者是图像化的数据,因为这里是需要计算的,所以花费也是较高的。因为他是根据DWU计费,根据计算量付费和存储空间计费。
- 还有一种是serverless sql pool 基于分析的是数据量进行计费。所以可以使用这个进行快速的临时性的数据分析, 例如在data lake 里的各种数据。注意,这里的数据是无法persisted的,这里处理过的数据,需要保存在sql pool 里面。
4 使用synapse链接外部文件(09,10)
回顾:首先你需要
- 一个azure sql database名字为sqldatabase01lg
-
创建一个data lake gen2 account 名字为datalake01lg,并且container里面创建两个containers 分别csv用来存放.csv文件,parquert用来存放二进制的qarquert文件,上穿Log.csv Log.qarquert
创建一个synapse(06,07)
- 在创建synapse的时候必须有gen2 account,可以使用现有的,也可以使用新建的。这里我们已经有一个之前创建的datalake01lg的gen2,但是, 我们现在为了演示从外部来获取数据,所以我们需要创建一个新的gen2 account,名字为datalake02lg
-
在创建的过程中,他会默认创建一个sql server的服务,需要设置用户名和账号,这里的username是sqladmin.
use synapse (09,10)
- 这里我们把之前创建的datalake01lg的gen2 account作为外部数据源,来进行数据的操作
-
选择添加外部数据源
-
选择添加外部数据源的类型,这里是最前面的gen2 container里的数据
-
成功后,我们在Linked里面就可以找到,刚才添加的datalake01lg的外部数据源
-
Then, we clicked the csv and executed to query top 100 data, there was an error occurred,it means we do not have the authority to use this container.
-
datalake02lg needs to be authorized by datalake01lg to use the containers. Now go back to the datalake01lg , find the Access Control and add a role assessment.(give permissions to datalake02lg)
给外部account授权使用datalake01lg的权限
-
找到blob reader
-
添加成员,并授权
-
当我们授权给datalake02lg可以读取01里的Container的成功后,在执行上的query,就可以查询成功
使用外部数据的重要指令
- 创建外部数据源, Create external data source
- 指定外部数据源格式, Create external file format
- 创建外部数据表, Create external table
-
在使用外部数据的时候,需要生成token
读取外部CSV文件(10)
- 创建外部指令的代码
CREATE DATABASE [appdb]; --创建appdb数据库
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'P@ssword@123' --主加密 密钥
CREATE DATABASE SCOPED CREDENTIAL SasToken --数据库范围凭证
WITH IDENTITY= 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2022-11-02&ss=b&srt=sco&sp=rlx&se=2023-08-11T09:42:48Z&st=2023-08-11T01:42:48Z&spr=https&sig=TBRMpm7332EjuSYSFC2LKMUTkcjB8dfi%2B3NTNsPhiRU%3D'
CREATE EXTERNAL DATA SOURCE log_data -- 外部数据源
WITH(LOCATION='https://datalake01lg.blob.core.windows.net/csv', --读取外部数据源的位置,文件夹
CREDENTIAL = SasToken ) -- 获取SasToken
- 完成了上述的步骤,我们就已经创建了数据源,现在我们使用指令来,读取外部数据
--创建外部文件源
CREATE EXTERNAL FILE FORMAT TextFileFormat WITH(
FORMAT_TYPE = DELIMITEDTEXT,
FORMAT_OPTIONS(
FIELD_TERMINATOR=',',
FIRST_ROW = 2
)
)
--创建外部表
CREATE EXTERNAL TABLE [logdata]
(
[Correlation id] [varchar](200) NULL, -- 必须和csv文件里的表头一致
[Operation name] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Event category] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [datetime] NULL,
[Subscription] [varchar](200) NULL,
[Event initiated by] [varchar](1000) NULL,
[Resource type] [varchar](1000) NULL,
[Resource group] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
WITH(
LOCATION ='/Log.csv', --上面地址的根目录下的文件
DATA_SOURCE = log_data, --上面的创建的data source名
FILE_FORMAT = TextFileFormat --上面指定的文件格式
)
SELECT * FROM [logdata]
读取外部parquet 文件
- 我们可以使用上面的sastoken的代码,但是这样会出现问题,如果我们使用上面的sastoken的时候,这个token过期,那么我们就要生成新的token。
- drop 外部表
- drop 外部数据源
- drop 过期的token
- 用上面的方法重新生成新的sastoken
DROP EXTERNAL TABLE [logdata]
DROP EXTERNAL DATA SOURCE log_data
DROP DATABASE SCOPED CREDENTIAL SasToken
- DROP了过期的token后,我们去生成新的sastoken,然后进行parquet的读取,注意这里我们可以直接从设置数据库范围凭证开始,因为前面我们已经设置过了主加密密钥
CREATE DATABASE SCOPED CREDENTIAL SasToken --创建数据库代码凭证
WITH IDENTITY= 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2022-11-02&ss=b&srt=sco&sp=rlx&se=2023-08-11T10:15:25Z&st=2023-08-11T02:15:25Z&spr=https&sig=nzwVeqZUBPDHuQETSfeN0nIBy7mOAGYhXzWzXqVKAZ0%3D'
CREATE EXTERNAL DATA SOURCE log_data_parquet -- 创建外部数据格式
WITH(LOCATION='https://datalake01lg.blob.core.windows.net/parquet', --读取外部数据源的位置,文件夹parquet
CREDENTIAL = SasToken ) -- 获取SasToken
--创建外部文件源parquet
CREATE EXTERNAL FILE FORMAT ParquetFileFormat WITH(
FORMAT_TYPE = PARQUET,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
)
--创建外部表
CREATE EXTERNAL TABLE [logdata_parquet]
(
[Correlation id] [varchar](200) NULL, -- 必须和csv文件里的表头一致
[Operation name] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Event category] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [varchar](100) NULL,
[Subscription] [varchar](200) NULL,
[Event initiated by] [varchar](1000) NULL,
[Resource type] [varchar](1000) NULL,
[Resource group] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
WITH(
LOCATION ='/Log.parquet', --上面地址的根目录下的文件
DATA_SOURCE = log_data_parquet, --上面的创建的data source名
FILE_FORMAT = ParquetFileFormat --上面指定的文件格式
)
SELECT * FROM [logdata_parquet]
- 注意:和上面csv还是有区别的
- csv表中的Time是时间类型,而parquet里面都是字符串
- 其次在parquet文件中,列名是不能有空格的,所以这个结果下所有有空格的列的值都为空,所以删除列明中的空格就可以成功的显示数据
创建一个SQL pool(016)
-
没有sql pool之前 只有个build-in,此时我们需要创建一个新的pool
-
创建成功后,我们就可以看到
-
在synapse的Data里,我们就看到我们的pool,他和之前的sql serverless 不一样的原因是,serverless只有external表,而在pool里,不仅仅有exeternal table 他还有自己的Table
-
使用sql pool时候,因为我们已经创建过pooldb,所以不需要和serverless一样,创建一个新的库appdb,我们只需要将我的工作环境改为pooldb,类似于sql2008下的 use pooldb,其余代码一模一样
使用sql pool读取外部的csv表格(017)
-
这里我们需要新的token,是datalake01lg的access keys
- 其次注意 LOCATION也发生了改变,其中,注意将blob改为dfs
CREATE DATABASE SCOPED CREDENTIAL AzureStorageCredential
WITH
IDENTITY = 'datalake01lg',
SECRET = 'ZuTNLYAYPx1gmjZHE2+pIizxi4zwOHBYX395HDVQ/wM47yfhHBEBppzMKVegfROIi99SCwCWD04F+AStQjvUxw==';
CREATE EXTERNAL DATA SOURCE log_data
WITH ( LOCATION = 'abfss://csv@datalake01lg.dfs.core.windows.net/parquet',
CREDENTIAL = AzureStorageCredential,
TYPE = HADOOP
)
- SQL POOL charges for hours, so when we don't use it, we can stop and it means you can not use it to calculate ability. But the storage still costs a lot of money.
在sql pool 里加载数据 019
复制csv到sql pool里
-
链接和选择我们自己的sql pool 然后通过建表语句创建一个新的表,我们可以在sql pool里 看到这个新建的表
直接从datalake01lg里的container复制csv表到sql pool里,这里From的链接是从container里获得的csv的链接,Storage Account Key是datalake01lg里的Access Key
COPY INTO [pool_logdata] FROM 'https://datalake010lg.blob.core.windows.net/csv/Log.csv'
WITH(
FILE_TYPE='CSV',
CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='t7Wc/rXNJ5fXU45W6NVxMYUG3XG5ge8dUVlmvSbpMMGonY0puqMDuA4DIcGT3IwA6sYIqQSHcu9U+AStvZLNcw=='),
FIRSTROW=2)
SELECT * FROM [pool_logdata]
-
复制成功后,我们就可以查询pool_logdata里的数据了
复制parquet到sql pool里
- 首先,还是创建一个新的表名为
pool_logdata_parquet
到sql pool里
CREATE TABLE [pool_logdata_parquet]
(
[Correlationid] [varchar](200) NULL,
[Operationname] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Eventcategory] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [varchar](500) NULL,
[Subscription] [varchar](200) NULL,
[Eventinitiatedby] [varchar](1000) NULL,
[Resourcetype] [varchar](1000) NULL,
[Resourcegroup] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL)
- 复制parquet
COPY INTO [pool_logdata_parquet] FROM 'https://datalake244434.blob.core.windows.net/parquet/log.parquet'
WITH(
FILE_TYPE='PARQUET',
CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='vDV2bSKSR44lbE6x05HtFz57DvlK3O2WNkb11te+H+GrBjeXCojnHjiTw3KdYBWXJRSAnOAZNdgB+AStAasz8w==') )
SELECT * FROM [pool_logdata_parquet]
SELECT * FROM [logdata_parquet]
使用Bulk load 加载数据到表中022
-
Linked页面下选择csv文件,前提上面已经链接了外部数据库。
-
注意,我们在加载bulk load的时候,出现一个问题是的读取不到,原因是因为未授权,我们需要去datalake01lg里的Access Control来添加我们datalake02lg account的权限。
-
授权后,选择要处理的文件
-
选择导入的表
-
系统将会自动生成语句,其实这里使用了Azure synapse的pipline
使用PolyBase复制external数据到sql pool的表里023
一种更加有效的方法来加载外部的数据到sql pool里
之前的方法是1. 创建数据库凭证 2.创建外部数据源链接 3.创建外部表结构,这样就建立了外部数据的链接,
如果我们想复用这些的话,可以使用下面代码,读取系统里的外部表,这样我们就可以很轻松的创建各种表,只用创建一次1,2,3
SELECT * FROM sys.database_scoped_credentials --读取外部数据凭证
SELECT * FROM sys.external_data_sources --查询外部数据源
SELECT * FROM sys.external_file_formats --查询外部文件
使用synapse的pipeline复制外部数据到pool里
-
我们使用Integrate里的Copy Data tool
-
在Source里选需要链接的地方和文件
-
配置源文件的格式,这里我们是csv,且第一行是表头
-
选择数据表要复制的地方,这里我们选择pooldb里的已经存在的表
-
创建pipeline成功
-
成功后,我们可以在monitor里面看到我们的pipeline的状态而且pooldb的数据库也有了数据,说明我们的pipeline成功
使用pipeline复制database的数据到sql pool里
-
创建一个Copy数据的pipeline,我们发现之前直接的下拉框找不到我们的database01lg的数据库,我们需要添加一个new connection
-
选择sql 的服务器和sql的数据库
-
后面和上面的一样最后,我们的pipeline运行成功
设计data warehouse
- Fact Table: 实际用到的数据,类似于销售表,你需要的是货号,时间,销量
- Dimension Table: 但是,知道了上面的销售情况,你需要类似于货号关联的商品信息,商品信息关联的客户表等等,这就是Dimension Table
创建一个Fact table
-
使用sql server 2008链接sql pool
-
链接sql database
-
执行连表查询
- 注意:我们在实际的生产中,为了不扰乱原来的数据,我们一般先生成一个临时的fact table,然后通过pipeline来获取这临时表里的数据
创建一个Dimension表
和上面的表一样,我们使用View试图创建表
-- Lab - Building a dimension table
-- Lets build a view for the customers
CREATE VIEW Customer_view
AS
SELECT ct.[CustomerID],ct.[CompanyName],ct.[SalesPerson]
FROM [SalesLT].[Customer] as ct
-- Lets create a customer dimension table
SELECT [CustomerID],[CompanyName],[SalesPerson]
INTO DimCustomer
FROM Customer_view
-- Let's build a view of the products
CREATE VIEW Product_view
AS
SELECT prod.[ProductID],prod.[Name] as ProductName,model.[ProductModelID],model.[Name] as ProductModelName,category.[ProductcategoryID],category.[Name] AS ProductCategoryName
FROM [SalesLT].[Product] prod
LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]
-- Lets create a product dimension table
SELECT [ProductID],[ProductModelID],[ProductcategoryID],[ProductName],[ProductModelName],[ProductCategoryName]
INTO DimProduct
FROM Product_view
-- If you want to drop the views and the tables
DROP VIEW Customer_view
DROP TABLE DimCustomer
DROP VIEW Product_view
DROP TABLE DimProduct
从sqldatabase里迁移数据到sql pool里面
-
此时,在sql database里,我们有1张Fact表和2张deminsion表
-
在synapse里的sql pool现在是没有任何表的
在synapse 里创建copy data的pipeline,和上面的几乎一致
-
pipeline完成后,可以看到pooldb里已经有了刚才的三张表
设计表
- Hash Table可以提高大型表的查询性能
-
Round-Robin表可以提高数据的加载性能
创建一个哈希表(Hash Table)
Fact table 使用哈希表, dimension table使用复制表
- 创建一个基于CustomerID的hash distribution table
CREATE TABLE [dbo].[FactSales](
[ProductID] [int] NOT NULL,
[SalesOrderID] [int] NOT NULL,
[CustomerID] [int] NOT NULL,
[OrderQty] [smallint] NOT NULL,
[UnitPrice] [money] NOT NULL,
[OrderDate] [datetime] NULL,
[TaxAmt] [money] NULL
)
WITH
(
DISTRIBUTION = HASH (CustomerID)
)
- 创建好后,我们使用pipleline给上面的表导入数据
创建一个复制表(replicated Tables)
- 创建一个replicated table
-- Replicated Tables
CREATE TABLE [dbo].[FactSales](
[ProductID] [int] NOT NULL,
[SalesOrderID] [int] NOT NULL,
[CustomerID] [int] NOT NULL,
[OrderQty] [smallint] NOT NULL,
[UnitPrice] [money] NOT NULL,
[OrderDate] [datetime] NULL,
[TaxAmt] [money] NULL
)
WITH
(
DISTRIBUTION = REPLICATE
)
使用surrrogate keys for dimension tables
-
没使用surrogate key的表里面的ID是从1自增的
- 我们创建一个有surrogate key的表
CREATE TABLE [dbo].[DimProduct](
[ProductSK] [int] IDENTITY(1,1) NOT NULL, -- surrogate key
[ProductID] [int] NOT NULL,
[ProductModelID] [int] NOT NULL,
[ProductcategoryID] [int] NOT NULL,
[ProductName] varchar(50) NOT NULL,
[ProductModelName] varchar(50) NULL,
[ProductCategoryName] varchar(50) NULL
)
-
创建成功后,我们将数据迁移的sql pool里,发现key不是从1自增的,而是根据azure的分布来给的ID
Slowly Changing the dimension table
-
我们再更改dimension table的时候,例如一个商品详情表的一个商品的名称,如果更改只是少量的一次性的,且以后不会在用到的时候,我们可以一次性更新。
但是,如果我们想更新一下很多商品的价格,但是这个旧的价格,可能在以后数据分析中使用,那么我们就需要给他一个标识
Indexs
- 在传统数据库中,我们使用索引是很常见的,为了提高查询效率
- 在azure中,因为已经有个distribution ,所以在group by ,join的使用上效率大大提高。但是如果需要使用where进行过滤,我们则需要使用index
创建聚集性索引(clustered index) 和非聚集性索引(none-clustered index)
- 1.创建一个新的表pool_logdata,使用hash distribution
CREATE TABLE [pool_logdata]
(
[Correlation id] [varchar](200) NULL,
[Operation name] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Event category] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [datetime] NULL,
[Subscription] [varchar](200) NULL,
[Event initiated by] [varchar](1000) NULL,
[Resource type] [varchar](1000) NULL,
[Resource group] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
WITH
(
DISTRIBUTION = HASH ([Operation name])
)
-
2.使用之前旧的pipeline来转移数据
- 3.查看一下数据分布, 我们发现,数据被分在60个块里,但是有些rows是0,这就是数据倾斜
DBCC PDW_SHOWSPACEUSED('[dbo].[pool_logdata]')
- 4.创造一个聚集性索引
-- Note on creating Indexes
CREATE TABLE [logdata]
(
[Correlation id] [varchar](200) NULL,
[Operation name] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Event category] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [datetime] NULL,
[Subscription] [varchar](200) NULL,
[Event initiated by] [varchar](1000) NULL,
[Resource type] [varchar](1000) NULL,
[Resource group] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
WITH
(
DISTRIBUTION = HASH ([Operation name]),
CLUSTERED INDEX ([Resource type]) -- 聚集性索引
)
CREATE INDEX EventCategoryIndex ON [logdata] ([Event category]) --非聚集性索引
创建一个heap table 临时表
少于6000w数据的临时表,使用heap table,大于这个数使用clustered columnstore table.
-- Heap tables
CREATE TABLE [logdata]
(
[Correlation id] [varchar](200) NULL,
[Operation name] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Event category] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [datetime] NULL,
[Subscription] [varchar](200) NULL,
[Event initiated by] [varchar](1000) NULL,
[Resource type] [varchar](1000) NULL,
[Resource group] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
WITH
(
HEAP,
DISTRIBUTION = ROUND_ROBIN
)
CREATE INDEX ResourceTypeIndex ON [logdata] ([Resource type])
遍历分区表
- 便利分区表可以提高带有where的语句查询效率,使用分区,可以使你的数据分层不同的区域,将数据划分呢成更加的分组 ,他和distribution是不同的。partitions一般是基于时间来进行的分区的,既如果经常使用到日期作为查询条件,那么partitions就很有效。支持 hash distribution tables, heap tables ,round robin tables,
replicated tables, clustered column tables
创建一个Partition table
CREATE TABLE [logdata]
(
[Correlation id] [varchar](200) NULL,
[Operation name] [varchar](200) NULL,
[Status] [varchar](100) NULL,
[Event category] [varchar](100) NULL,
[Level] [varchar](100) NULL,
[Time] [datetime] NULL,
[Subscription] [varchar](200) NULL,
[Event initiated by] [varchar](1000) NULL,
[Resource type] [varchar](1000) NULL,
[Resource group] [varchar](1000) NULL,
[Resource] [varchar](2000) NULL
)
WITH
(
DISTRIBUTION = HASH ([Operation name]),
PARTITION ( [Time] RANGE RIGHT FOR VALUES
('2023-01-01','2023-02-01','2023-03-01','2023-04-01'))
)
-
创建后的表的分区结构是
读取json数据
- 给datalake01lg里的container加一个json