SSIS Partition Switching Template gérant la compression et les ColumnStore Indexes

Pour faire suite au post précédent sur l’administration des partitions sous SQL Server (Gestion dynamique des partitions SQL Server), voyons désormais comment créer un package SSIS Template permettant d’implémenter le partition switching en gérant la compression et les ColumnStore Indexes.

La compression de données qu’est-ce c’est ?

Cette fonctionnalité est apparue depuis la version SQL Server 2008, elle permet comme son nom l’indique de compresser les données et donc de gagner de l’espace de stockage. Ce que beaucoup de monde ignore, c’est que la compression permet aussi de réduire les I/O (pour la lecture), les données étant compressées elles sont contenues dans moins de pages réduisant par conséquent le nombre de pages lues lors d’une requête.
Ce que vous devez savoir, il en existe 2 types de compression :

  • la compression par ligne (ROW): les types fixes sont « remplacés » par des types variables pour optimiser le stockage en fonction de la valeur à stocker et non par rapport à son type. le coût de compression est de 4 bits par colonnes permettant d’indiquer entre autre la taille des données. Prenons l’exemple d’un CHAR(10) contenant la valeur ‘ABC’ au lieu d’être stocké sur 10 octets ce dernier ne sera stocké que sur 3 octets + les 4 bits de compression. Là-dessus vous me direz « très bien mais le nvarchar ce n’est pas fait pour les chiens »; et je vous dirais que vous avez raison. Mais ce mécanisme marche aussi sur les types de données numériques, si vous avez un colonne en INT, toutes les valeurs comprises entre 0-255 ne seront stockées que sur un seul octet au lieu de 4 (sans oublier bien sur nos 4 bits de compression😉 )
  • la compression par page (PAGE) est une version améliorée de la compression par ligne. Le moteur SQL ajoute dans la page une structure permettant de référencer des valeurs répétitives et/ou de préfixes récurrents. Le moteur peut combiner les deux stratégies car elles peuvent se révéler complémentaires dans certains cas, par ailleurs, si le moteur ne peut pas déterminer de stratégie il appliquera par défaut une compression par ligne.

Autant vous dire que je n’utilise que la compression par PAGE.

 

Les ColumnStore Indexes (CSI), kézako ?

Il s’agit d’un type d’index apparu avec SQL Server 2012. Cette fonctionnalité utilise le moteur VertiPaq pour compresser et indexer les données en colonnes. Et qui dit nouveau moteur, dit nouveau mode de traitement de données et nouveau plan d’exécution. Les données sont traitées en mode batch et non plus en mode séquentiel.

Faisons un petit comparatif entre un mode de stockage ligne à ligne et un mode de stockage en colonne.

Avec le stockage traditionnel chaque ligne de données est contenue dans une page :

RowIndexes

Ainsi, lorsque nous faisons un simple SELECT COL1, SUM(COL2) FROM MyTable GROUP BY COL1 le moteur doit lire toutes les pages de données pour n’exploiter, au final, que deux colonnes.

En mode colonne le stockage, les pages de données contiennent les valeurs distinctes des différentes :

ColumnIndexes

La même requête ne va donc lire que les pages de données relatives aux colonnes 1 et 2 uniquement.

Les avantages

  • Forte compression des données
  • Seules les colonnes nécessaires sont lues ce qui implique une forte réduction des I/O et une meilleur gestion de la mémoire
  • Les données sont traitées en mode batch

 Les limites

  • Il ne peut y avoir qu’un seul CSI par table.
  • Les indexes clustered ne sont pas encore disponibles (cf. 1ère Key Note du PASS Summit 2012  )
  • La mise en place d’un CSI verrouille la table en lecture, il vous sera donc impossible d’effectuer n’importe quelle opération d’UPDATE, INSERT, DELETE, MERGE
  • Comme il s’agit d’un index non cluter, l’index seek n’est pas possible… Mais rassurez-vous les performances sont au rendez-vous, car les données sont stockées en      segment, chaque segment concerne environ 1 millions lignes et est borné avec une valeur min et une valeur max, ce qui permet au moteur d’effectuer rapidement des éliminations de données par segment.
  • Ne supporte que les types de données traditionnels (int, real, char, string , money, datetime et les numeric de moins de 18 digits. Vous devrez passer votre chemin pour les decimal de plus de 18 digits, les (n)varchar(max), les binary, blob, clr, uniqueidentifier …

Les recommandations

  • Etant donné qu’il ne peut y avoir qu’un seul CSI par table, indexer toutes les colonnes de la table
  • Sur les tables avec un CSI,   supprimer tous les non clustered index « traditionnels », ils sont inutiles.

Après cette courte introduction, passons désormais à l’implémentation de notre package SSIS.

 

Implémentation du partition switching avec SSIS

Cas d’utilisation

Afin de réaliser mon package template, je vais prendre un cas d’utilisation classique en BI :

Les données de ventes doivent être importées quotidiennement. Chaque nuit, les données de ventes de la veille sont extraites de l’ERP source dans un fichier plat au format CSV. Les données de ventes ne sont jamais modifiées, et comme dans ton bon système transactionnel digne de son nom, nous n’avons jamais d’opérations antidatées ! Pour notre exemple nous partons sur une volumétrie approximative d’1 millions d’enregistrements par jour.

Pour résumé, mon package ne doit intégrer tous les jours qu’un seul fichier. Mais parce que je suis prudent et clairvoyant j’envisage différents scénarios  :

  • recharge d’une journée spécifique
  • recharge d’un mois spécifique
  • recharge complète de tout l’historique …

Bref, que des cas tordus qu’il vaut mieux envisager dès maintenant. Le but du jeu étant de garantir un temps de traitement constant en fonction de la volumétrie chargée  tout en maintenant la gestion des contraintes d’intégrité et la non fragmentation des indexes. Ainsi si mon client me demande de recharger tout l’historique, je suis capable de lui annoncer un temps de traitement cohérent (temps de traitements = n partitions à charger * temps moyen de chargement d’une partition)

Pré requis

Pour l’implémentation de mon package, je pars sur les pré requis suivants :

  • la table partitionnée doit contenir un index clustered (je rappelle au passage que cet index doit obligatoirement contenir la clé de partitionnement)
  • les fichiers utilisés respecte la convention de nommage suivante Sales_<YYYYMMDD>.csv
  • la généricité du Template proposé s’arrête n’a de limite que le format de fichier en entrée et le Data Flow Task de chargement/transformation des données dans la table de staging

Mise en place de mon environnement de test

Pour la mise en place de notre cas pratique, je pars sur un partitionnement mensuel et j’initialise mon plan de partitionnement à l’aide de mon package d’administration des partitions. Une fois le paramétrage effectué et le package exécuté, je me retrouve donc avec l’ensemble des fichiers, filegroup, fonctions et schémas de partitionnement nécessaires.

PartitionsObjects

Je crée ensuite la table de fait partitionnée des ventes dbo.FactSalesPartitioned avec :

  • 6 contraintes d’intégrité référentielles
  • un clustered index contenant la clé de partitionnement que j’utilise, à savoir la date de vente OrderDateKey. Cette dernière respecte le format YYYYMMDD et est stockée dans un entier.
  • et pour le fun un ColumnStore index couvrant l’intégralité des colonnes de ma table (15 colonnes)

FactSalesPartitioned

Commençons la mise œuvre de notre package

1. Déclaration des paramètres de notre package

  • Le package doit avoir en entrée les informations suivantes
  • Le nom de la table partitionnée à alimenter (partitionedTable)
  • Le pattern de nommage de la table de staging (stagingTablePattern)
  • Le dossier contenant les fichiers à charger (sourceFilePath)
  • Le pattern de fichier à charger (filePattern)
  • Le dossier où archiver les fichiers chargés (archiveFilePath)
  • Un flag indiquant si la compression de données doit être désactivée avant insertion (

N.B : je dois avouer que je n’ai pas déclaré de paramètres sur mon package. J’ai utilisé directement des variables. Libre à vous de le faire et d’initialiser les variables citées ci-dessus avec les valeurs de vos paramètres.

2. Dans un premier temps, je récupère l’object_id associé à la table partitionnée ainsi que la fonction de partition utilisée à l’aide d’un simple SQL Task et de la requête SQL suivante :

declare @factTable nvarchar(256)= ?

select
tables.object_id,
partition_functions.function_id
from
sys.tables
inner join
sys.schemas
on schemas.schema_id = tables.schema_id
inner join
sys.indexes
on indexes.object_id=tables.object_id
inner join
sys.partition_schemes
on partition_schemes.data_space_id = indexes.data_space_id
inner join
sys.partition_functions
on partition_functions.function_id = partition_schemes.function_id
where indexes.type=1
and tables.object_id = object_id(@factTable)

3. Afin d’effectuer un chargement optimum, nous devons nous assurer de traiter les fichiers par paquets. Chaque paquet devant être rattaché à une et une seule partition.
Nous devons donc confronter la liste des fichiers à charger dans notre table avec les informations exhaustive de partitionnement.

Une fois ces informations récupérées, nous pouvons ainsi déterminer si une partition est à charger dans son intégralité ou bien partiellement. En effet, si le nombre de fichiers relatifs à une partition est égal au nombre de jours compris entre la borne min et la borne max de la partition, cela signifie qu’il s’agit d’un chargement de type truncate/insert; sinon nous ne chargeons qu’une partie des données de la partition, le mode de chargement est alors en delete/insert.

Toutes ces opérations sont réalisées à l’aide d’un Data Flow Task permettant d’alimenter une variable recordset (User::filesToLoad ) contenant toutes les informations nécessaires pour traiter l’ensemble des fichiers à intégrer. (Une fois de plus je vous renvoie à l’article Chargement efficient de fichiers avec SSIS)

Le Data Flow Task ainsi implémenté, nous permet de récupérer pour chaque fichier :

  • le nom complet du fichier
  • le numéro de la partition dans laquelle le fichier doit être chargé
  • le filegroup associé à la partition
  • la date des données contenues dans le fichier
  • un flag permettant de déterminer s’il s’agit du premier fichier à traiter pour la partition concernée
  • un flag permettant de déterminer s’il s’agit du dernier fichier à traiter pour la partition concernée
  • le type de compression à utiliser sur la partition concernée
  • un flag si la partition est à charger partiellement ou complètement
  • la colonne utilisée comme clé de partitionnement

Notez qu’avant l’insertion dans le recordset, je tri les données par n° de partition et par date de données de façon à pourvoir gérer mes « paquets » de fichiers.

N.B : si vous avez de plus forte volumétrie, vous pouvez toujours créé une partition par jour. Pour info, SQL Server 2012 64 bits peut gérer jusqu’à 15 000 partitions par table (si on descend à la journée, cela représente environ 40 ans de données)

4. Une fois les informations collectées dans un recordset, il me reste à implémenter la logique de traitement des fichiers dans une boucle. J’utilise donc un ForEach Loop Container me permettant de récupérer pour chaque fichier les informations citées précédemment.
Le paramétrage du container est le suivant :

FELC - Loop Over Files To (Re)Load - Settings

Le premier fichier de chaque partition doit déclencher la création de la table de staging. Je rappelle que celle-ci doit avoir exactement le même schéma que la table source. Pour créer la table de staging, j’utilise un SQL Task prenant en paramètre :

  • le nom de la table partitionnée qui doit être chargée (User::partitionedTable)
  • le nom de la table de staging à construire (User::stagingTable)
  • le filegroup de destination pour la partition concernée (User::filegroupname)
  • le numéro de la partition à charger (User::partitionNumber)
  • un flag indiquant si la table de staging doit être tronquée après le switch de partition (User::truncateStagingTable). Si la partition est chargée intégralement, c’est à dire si le nombre de fichiers relatifs à la partition est égal au nombre de jour compris entre les bornes de la partitions, alors elle peut être tronquée, et nous pouvons supprimer tous les indexes. Sinon, seul le ColumnStore Indexe doit être supprimé de façon à optimiser les éventuelles suppression de données en utilisant l’index clusterd
  • le type de compression à utiliser sur la partition (User::partitionCompressionType)
  • un flag indiquant si la compression doit être désactivée avant le chargement des fichiers (User::removeCompressionBeforeBulkInsert)

Lors de la création de la table de staging, nous récupérons le script complet de (re-)création des indexes pour les récréer sur la table de stating après le chargement complet de la partition. Cette commande est stockée dans la variable User::sqlStmtCreateAllIndexes.

Le SQL Task est donc paramétré comme ceci :

SQL - Prepare Staging Table - Settings

La création de la table de staging s’effectue dynamiquement en récupérant les metadata de la table partitionnée cible :

Une fois la table de staging créée, je switch les données de partition de la table cible vers la table de staging, je désactive la compression, supprime le ColumnStore index s’il y en a un, et si la partition est complètement chargée, je la tronque et supprime tous les autres indexes

Au final on obtient le script suivant :

</pre>
declare @partitionedTable nvarchar(256) = ?
declare @stagingTable nvarchar(256) = ?
declare @filegroupName nvarchar(128) = ?
declare @partitionNumber int = ?
declare @truncateStagingTable int = ?
declare @partitionCompressionType nvarchar(4) = ?
declare @removeCompressionBeforeBulkInsert int = ?
declare @sqlStmt nvarchar(max)
declare @sqlStmtCreateAllIndexes nvarchar(max)
declare @paramDefinition nvarchar(max)

-- create staging table based on destination table structure
SELECT
@sqlStmt=
'if exists (select 1 from sys.tables where tables.object_id=object_id(''' + @stagingTable + ''')) ' + CHAR(10)
+ 'drop table ' + @stagingTable + ';' + CHAR(10)
+ 'create table ' + @stagingTable + ' (' + cols.list + ')' + ' ON [' + @filegroupName + '];'
from
sysobjects
cross apply
(
select STUFF(
(
SELECT
', ' + ' [' + column_name + '] ' + data_type +
case data_type
when 'sql_variant' then ''
when 'text' then ''
when 'decimal' then '(' + cast(numeric_precision_radix as varchar) + ', ' + cast(numeric_scale as varchar) + ')'
else coalesce('('+case when character_maximum_length = -1 then 'MAX' else cast(character_maximum_length as varchar) end +')','')
end + ' ' +
case
when exists (select id from syscolumns where object_name(id)=sysobjects.name and name=column_name and columnproperty(id,name,'IsIdentity') = 1) then
'IDENTITY(' + cast(ident_seed(sysobjects.name) as varchar) + ',' + cast(ident_incr(sysobjects.name) as varchar) + ')'
else ''
end + ' ' +
(
case when IS_NULLABLE = 'No' then 'NOT ' else '' end ) + 'NULL ' +
case when information_schema.columns.COLUMN_DEFAULT IS NOT NULL THEN 'DEFAULT '+ information_schema.columns.COLUMN_DEFAULT ELSE '' END
from information_schema.columns
where table_name = sysobjects.name
order by ordinal_position
FOR XML PATH('')
),1,1,'')
) cols (list)
where sysobjects.xtype = 'U'
AND sysobjects.id = object_id(@partitionedTable)
execute sp_executesql @sqlStmt

-- create foreign key based on destination table structure
select @sqlStmt =
cast(
(
SELECT
'ALTER TABLE ' + @stagingTable + ' WITH ' + CASE WHEN IsDisabled=1 THEN ' NOCHECK ' ELSE ' CHECK ' END +
' ADD CONSTRAINT ' + FkConstraintName + ' FOREIGN KEY (' + STUFF(FKeyColumns,1,1,'') + ')' +
' REFERENCES ' + PkTableSchema + '.' + PkTableName + '(' + STUFF(PKeyColumns,1,1,'') + ')' +
' ON UPDATE ' + CASE UpdateReferentielAction WHEN 0 THEN ' NO ACTION ' WHEN 1 THEN ' CASCADE ' ELSE ' SET_NULL 'END +
' ON DELETE ' + CASE DeleteReferentialAction WHEN 0 THEN ' NO ACTION ' WHEN 1 THEN ' CASCADE ' ELSE ' SET_NULL 'END +
CASE WHEN IsNotForReplication=1 THEN ' NOT FOR REPLICATION ' ELSE '' END + ';' + char(10) as 'data()'
from
(
select
'[' + foreign_keys.name + '_' + cast(@partitionNumber as nvarchar(8)) + ']' as FkConstraintName,
'[' + OBJECT_NAME(foreign_keys.parent_object_id) + ']' as FkTableName,
'[' + schema_name(foreign_keys.schema_id) + ']' as FkTableSchema,
'[' + tables.name + ']' as PkTableName,
'[' + schema_name(tables.schema_id) + ']' as PkTableSchema,
(
select ',[' + col_name(foreign_keys.parent_object_id, foreign_key_columns.parent_column_id) + ']' as 'data()'
from sys.foreign_key_columns
where foreign_key_columns.constraint_object_id = foreign_keys.object_id
order by foreign_key_columns.constraint_column_id
for xml path('')
) as FKeyColumns,
(
select ',[' + col_name(foreign_keys.referenced_object_id, foreign_key_columns.referenced_column_id) + ']' as 'data()'
from sys.foreign_key_columns
where foreign_key_columns.constraint_object_id = foreign_keys.object_id
order by foreign_key_columns.constraint_column_id
for xml path('')
) as PKeyColumns,
foreign_keys.is_disabled as IsDisabled,
foreign_keys.is_not_for_replication as IsNotForReplication,
foreign_keys.delete_referential_action as DeleteReferentialAction,
foreign_keys.update_referential_action as UpdateReferentielAction
from
sys.foreign_keys
LEFT OUTER JOIN sys.tables ON tables.OBJECT_ID = foreign_keys.referenced_object_id
where
foreign_keys.parent_object_id = object_id(@partitionedTable)
) ForeignKeys
for xml path('')
)
as nvarchar(max))

execute sp_executesql @sqlStmt

--getting indexes definition from destination table and store them into temporary table
select
'IF EXISTS (SELECT 1 from sys.indexes where object_id=' + cast(object_id(@stagingTable) as nvarchar) + ' and name=''' + indexes.name +''')' + char(10)
+ case
when indexes.is_primary_key = 1 then
' ALTER TABLE ' + @stagingTable + ' DROP CONSTRAINT [' + indexes.name + '];' + char(10)
+ 'ALTER TABLE ' + @stagingTable + ' ADD CONSTRAINT [' + indexes.name + '] PRIMARY KEY '
+ case when INDEXPROPERTY (indexes.object_id,indexes.name,'ISCLUSTERED') = 1 then ' CLUSTERED ' else ' NONCLUSTERED ' end
+ '(' + key_cols.list + ')'
else
'DROP INDEX [' + indexes.name + '] ON ' + @stagingTable + ';' + char(10)
+ 'CREATE '
+ case when indexes.is_unique= 1 then ' UNIQUE ' else '' end
+ case indexes.type when 1 then ' CLUSTERED ' when 2 then ' NONCLUSTERED ' when 6 then ' NONCLUSTERED COLUMNSTORE ' end
+ ' INDEX [' + indexes.name + '] '
+' ON ' + @stagingTable + ' '
+ CASE when indexes.type in (1,2) then '(' + key_cols.list + ')' else '' end
+ CASE
when included_cols.list IS NULL then ''
when indexes.type in (1,2) then ' INCLUDE (' + included_cols.list + ')'
when indexes.type = 6 then '(' + included_cols.list + ')'
else ''
end
+ case when indexes.has_filter=1 then ' WHERE ' + indexes.filter_definition else '' end
end
+ case
when indexes.type in (1,2) then
' WITH ('
+ ' PAD_INDEX = ' + case when indexes.is_padded=0 then ' OFF' else ' ON' end
+ ', STATISTICS_NORECOMPUTE = ' + case when INDEXPROPERTY (indexes.object_id,indexes.name,'IsAutoStatistics') = 0 then ' OFF' else ' ON' end
+ ', IGNORE_DUP_KEY = ' + case when indexes.ignore_dup_key=0 then ' OFF' else ' ON' end
+ ', ALLOW_ROW_LOCKS = ' + case when indexes.allow_row_locks=0 then ' OFF' else ' ON' end
+ ', ALLOW_PAGE_LOCKS = ' + case when indexes.allow_page_locks=0 then ' OFF' else ' ON' end
+ case when indexes.fill_factor = 0 then '' else ' , FILLFACTOR = ' + cast(indexes.fill_factor as nvarchar(10)) end
+ ')'
else
''
end
+ ' ON [' + data_spaces.name + ']' + case when data_spaces.type ='PS' then '(' + partition_cols.list + ')' else '' end + char(10)
+ ';' + char(10)
+ case
when indexes.is_disabled = 1 then 'ALTER INDEX [' + indexes.name + '] ON ' + @stagingTable + ' DISABLE' + char(10) + ';' + char(10)
else ''
end as createAllIndexes,
case when indexes.type=6 then 'DROP INDEX [' + indexes.name + '] ON ' + @stagingTable + ';' + char(10) else NULL end as dropColumnStoreIndex,
case
when (indexes.is_primary_key=1 and indexes.type in (1,2)) then
' ALTER TABLE ' + @stagingTable + ' DROP CONSTRAINT [' + indexes.name + '];' + char(10)
when indexes.type in (1,2) then
'DROP INDEX [' + indexes.name + '] ON ' + @stagingTable + ';' + char(10)
-- when indexes.type=6 then
-- 'DROP INDEX [' + indexes.name + '] ON ' + @stagingTable + ';' + char(10)
else
null
end as dropAllIndexes,
case when data_spaces.type ='PS' and indexes.type=1 then partition_cols.list else null end as partitionKey
into #stagingTableIndexes
from
sys.indexes
inner join
sys.objects
on objects.object_id = indexes.object_id
inner join
sys.schemas
on schemas.schema_id = objects.schema_id
inner join
sys.data_spaces
on data_spaces.data_space_id = indexes.data_space_id
cross apply
(
select
STUFF(
(
select ',[' + syscolumns.name + ']' + case when index_columns.is_descending_key=1 then ' desc' else ' asc' end as 'data()'
from
sysindexkeys
INNER JOIN
syscolumns
ON SYSCOLUMNS.id = sysindexkeys.id
AND SYSCOLUMNS.colid = sysindexkeys.colid
inner join
sys.index_columns
on index_columns.object_id = sysindexkeys.ID
and index_columns.index_id = sysindexkeys.indid
and index_columns.column_id = syscolumns.colid
where sysindexkeys.id=indexes.object_id
and sysindexkeys.indid=indexes.index_id
and index_columns.key_ordinal > 0
order by index_columns.key_ordinal
for xml path('')
),1,1,'')
) key_cols (list)
cross apply
(
select
STUFF(
(
select ',[' + syscolumns.name + ']' as 'data()'
from
sysindexkeys
INNER JOIN
syscolumns
ON SYSCOLUMNS.id = sysindexkeys.id
AND SYSCOLUMNS.colid = sysindexkeys.colid
inner join
sys.index_columns
on index_columns.object_id = sysindexkeys.ID
and index_columns.index_id = sysindexkeys.indid
and index_columns.column_id = syscolumns.colid
where sysindexkeys.id=indexes.object_id
and sysindexkeys.indid=indexes.index_id
and index_columns.is_included_column = 1
order by syscolumns.colid
for xml path('')
),1,1,'')
) included_cols (list)
cross apply
(
select
STUFF(
(
select ',[' + syscolumns.name + ']' as 'data()'
from
sysindexkeys
INNER JOIN
syscolumns
ON SYSCOLUMNS.id = sysindexkeys.id
AND SYSCOLUMNS.colid = sysindexkeys.colid
inner join
sys.index_columns
on index_columns.object_id = sysindexkeys.ID
and index_columns.index_id = sysindexkeys.indid
and index_columns.column_id = syscolumns.colid
where sysindexkeys.id=indexes.object_id
and sysindexkeys.indid=indexes.index_id
and index_columns.partition_ordinal > 0
order by index_columns.partition_ordinal
for xml path('')
),1,1,'')
) partition_cols (list)
where indexes.type in (1,2,6)
and indexes.is_hypothetical = 0
and objects.type='U'
and indexes.object_id=object_id(@partitionedTable)

-- create indexes on staging table (based on destination table structure)
select @sqlStmt=
(
select createAllIndexes as 'data()'
from #stagingTableIndexes
where createAllIndexes is not null
for xml path(''), root('MyString'), type
).value('/MyString[1]','nvarchar(max)')

if isnull(@sqlStmt,'') > ''
begin
execute sp_executesql @sqlStmt
end
set @sqlStmtCreateAllIndexes = @sqlStmt

--apply compression on staging table
if upper(ltrim(rtrim(isnull(@partitionCompressionType,'')))) not in ('','NONE')
begin
set @sqlStmt = 'ALTER TABLE ' + @stagingTable + ' REBUILD PARTITION = ' + cast(@partitionNumber as nvarchar) + ' WITH(DATA_COMPRESSION = ' + @partitionCompressionType + ')'
execute sp_executesql @sqlStmt
end

--partition switching between table dest and staging table on current bound
set @sqlStmt = 'ALTER TABLE ' + @partitionedTable + ' SWITCH PARTITION ' + cast(@partitionNumber as nvarchar) + ' TO ' + @stagingTable + ' PARTITION ' + cast(@partitionNumber as nvarchar)

execute sp_executesql @sqlStmt
--disable compression on staging table

if upper(ltrim(rtrim(isnull(@partitionCompressionType,'')))) not in ('','NONE') and @removeCompressionBeforeBulkInsert = 1
begin
set @sqlStmt = 'ALTER TABLE ' + @stagingTable + ' REBUILD PARTITION = ' + cast(@partitionNumber as nvarchar) + ' WITH(DATA_COMPRESSION = NONE)'
execute sp_executesql @sqlStmt
end

--drop columnstore index
select @sqlStmt= dropColumnStoreIndex from #stagingTableIndexes where dropColumnStoreIndex is not null

if (@sqlStmt is not null)
begin
execute sp_executesql @sqlStmt
end

if (@truncateStagingTable = 1)
begin

--truncate staging table data related to current bound
set @sqlStmt = 'truncate table ' + @stagingTable
execute sp_executesql @sqlStmt

--drop all other indexes
select @sqlStmt=
(
select dropAllIndexes as 'data()'
from #stagingTableIndexes
where dropAllIndexes is not null
for xml path(''), root('MyString'), type
).value('/MyString[1]','nvarchar(max)')
execute sp_executesql @sqlStmt
end

select ?=@sqlStmtCreateAllIndexes

drop table #stagingTableIndexes

Si la partition n’est pas chargée en intégralité, je supprime les données de la table de staging relatives au fichier chargées. Là encore j’utilise un SQL Task qui prend en paramètre les informations suivantes :

  • le nom de la table de staging (User::stagingTable)
  • le nom de la clé de partitionnement (User::partitionKeyColumn)
  • la date (User::filedate)

ce qui me permet de construire dynamiquement puis d’exécuter la requête de suppression pour chaque fichier chargé  :

declare @stagingTable nvarchar(256) = ?
declare @partitionKeyColumn nvarchar(256) = ?
declare @filedate int = ?
declare @sqlStmt nvarchar(max)

set @sqlStmt = 'DELETE FROM ' + @stagingTable + ' WHERE ' + @partitionKeyColumn + ' = ' + cast(@filedate as nvarchar(8))
execute sp_executesql @sqlStmt

Je charge ensuite le fichier dans la table de staging à l’aide d’un Data Flow Task. C’est là que vous pouvez gérer la transformation des données et c’est là que la généricité s’arrête. Une fois le fichier chargé, il est déplacé dans un dossier d’archivage à l’aide d’un File System Task.

Tant qu’il ne s’agit pas du dernier fichier, la boucle ne fait que :

  • supprimer les données relatives au fichier (seulement si la partition est chargée partiellement),
  • intégrer les données de chaque fichier dans la table de staging
  • archiver le fichier chargé

Une fois le dernier fichier de chaque partition traité, il suffit de gérer le partition switching de la table de staging vers la table cible. Cela s’effectue en 4 étaptes :

  • reconstruction de tous les indexes
  • mise en place de la compression (s’il y a lieu)
  • switch de partition de la table de staging vers la table civle
  • et suppression de la table de staging

Comme à mon habitude, vous pouvez constater qu’on obtient un package très simple et très facilement réutilisable :

PKG - TemplateTablePartitionSwitching

 

Benchmark

Une fois le package implémenté et testé dans tous les sens (çà a été long, ca fait plus d’un mois que je suis dessus, bon je triche il y a eu les vacances de noël, un reprise difficile et il a fallu se remotivé au mois de janvier) voyons le résultat de nos premiers tests :

bench_partition_switching_csi_compression

Tout çà son mon pc portable dont les specs sont les suivantes :

  • DELL i5-3210M 2,50GHz
  • 16 GB DDR-3
  • DD SSD OCZ-VERTEX 4 de 256 GB

Alors imaginez sur un serveur, avec toutes les possibilités qui s’offre à vous :

  • possibilité de reconstruire les indexes en utilsant la tempdb (je rappelle au passage qu’elle doit être sur un disque dédié, si ce n’est pas le cas c’est pas bien !)
  • Les fichiers à intégrer sont en générale sur des disques différents de vos fichiers de bdd
  • Et si on pousse à l’extrême, de disposer d’un disque par partition. Nous pourrions alors implémenter un package enfant paramétré qui soit à l’image du contenu du For Each Loop Container. Ce dernier pourrait alors être exécuté en parallèle pour chaque partition à l’aide d’un Excecute Package Task.
  • Etc.

 

 

Conclusion

Je rappelle au passage un petit dicton bien connu mais très souvent oublié « qui peut le plus, peut le moins ». A savoir, si en nominal vous n’avez qu’un seul fichier à traiter, prévoyez toujours le fait d’avoir à recharger un ensemble de fichiers et concevez votre package de façon à optimiser leur intégration. Je dis cela pour plusieurs raisons dont les principales sont les suivantes :

  • #3 : les gros plantages avec une stratégie de backup bien foireuse çà existe
  • #2 : les coupures de chaines d’alimentation avec reprise sur plusieurs jours çà existe aussi
  • #1 : lors de la conception de votre entrepôt vous serez souvent amener à le vider complètement et à le recharger. Il vaut mieux que cela ne prenne que quelques heures plutôt que quelques jours.

Vous pouvez télécharger ce package sur Skydrive http://sdrv.ms/VlpB3A

Sans oublier la petite devise shadok (il ne m’en reste plus beaucoup dans les bacs ….)

d_21

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l'aide de votre compte WordPress.com. Déconnexion / Changer )

Image Twitter

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Photo Google+

Vous commentez à l'aide de votre compte Google+. Déconnexion / Changer )

Connexion à %s