logo le blog invivoo blanc

Comment trier des fichiers CSV ?

24 septembre 2018 | Python | 0 comments

1. Contexte

Deux fois au cours de ma carrière, on m’a donné le projet de comparer deux fichiers CSV de grande taille (Comma Separated Values : des fichiers textes correspondant soit à un export de fichier Excel soit un export de tables de bases de données) :

  • Dans le premier cas, il s’agissait de faire un rapprochement entre des trades saisis dans un progiciel customisé dans un pays P1 indépendant et ceux saisis dans le même progiciel customisé différemment dans un pays P2. Il fallait d’abord transcoder les informations puis reconnaitre ceux qui étaient absents du système de Paris et ceux qui avait été modifiés afin de mettre à jour les données de la base de Paris. Chaque fichier faisait environ 300Mb et les ordinateurs avaient environ 1Gb de RAM. Et le développement est à réaliser en 3j maximum. (Le principe : on veut le résultat pour hier avec un temps d’exécution inférieur à 8h).
  • Dans le second cas, il fallait rapprocher des fichiers de flux entre deux progiciels différents (l’ancien qui devait être décommissionné et le remplaçant) qui tournaient en concurrence afin de faire des non-régressions. Chaque fichier faisait 2.5Gb et l’objectif était de les comparer journalièrement en moins de 2h. Il fallait que le développement soit opérationnel en moins de deux semaines.

Pour pouvoir comparer, la première étape (et la plus complexe) consiste à trier les 2 fichiers : en effet en triant on pourra diminuer le nombre de lignes lues ainsi que le nombre de comparaison. Et malheureusement, trier le fichier nécessite de le charger et quand celui-ci est volumineux cela devient compliqué.

2. Choix techniques

2.1. Langage

Le choix du langage dépend fortement du contexte :

  • Si vous avez tout le temps de développement nécessaire, utilisez un langage compilé comme C/C++ ou compilé à la volée comme C# qui vous fournira d’excellentes performances et une gestion fine de la mémoire consommée.
  • Si votre besoin nécessite un temps de développement court il faut choisir un langage de script tels que Perl ou Python.

Pour l’exemple, dans ce billet, nous allons utiliser le langage Python afin d’avoir le code le plus compact et lisible possible vous permettant de vous approprier la méthode puis de de la transposer rapidement dans un autre langage.

2.2 – Algorithme

Le choix de l’algorithme va conditionner les performances ainsi que la faisabilité. En effet « charger un fichier CSV » de 2.5Gb en mémoire RAM n’est pas possible à moins d’avoir une capacité mémoire exceptionnelle interdisant de fait de trier le fichier en mémoire. Faites des essais et vous constaterez que la consommation mémoire est presque 10 fois plus importante dès lors que l’on a découpé les lignes en vecteur de valeurs.
Voici un petit exemple :

def read_csv( filename ):
    results = []
    with open( filename, 'r' ) as file:
        for line in file:
            line    = line.rstrip()
            columns = line.split( ';' )
            results.append( columns )
    return results

path    = "C:\\personnel\\python\\blog\\compare_csv"
before  = psutil.virtual_memory()
results = read_csv( path + "\\sample1.csv" )
after   = psutil.virtual_memory()
print( "psutil.size =", after.used - before.used )

Si on utilise un fichier csv de 39Mo, on obtient 341Mo en mémoire.
La solution consiste à utiliser un tri de fusion externe : on divise le gros fichier en plusieurs fichiers de même taille que l’on trie indépendamment les uns des autres puis on reconstruit le fichier par fusion des fichiers triés. On peut se renseigner sur les détails de la méthode sur les sites suivants :

3. Diviser pour régner

La première partie de l’algorithme consiste à diviser le fichier principal en sous-fichiers de taille équivalente (on va se baser sur un nombre de lignes du fichier d’origine) que l’on trie indépendamment les uns des autres. On découpe le fichier filename. Dans le répertoire temporary_path, on place les fichiers découpés. Chaque fichier contiendra rows_count lignes et sera trié en utilisant la fonction ‘sort’ des listes puis en passant la fonction get_key (cette fonction extrait des colonnes du fichier initial un tuple servant de clé de tri) dans l’argument key.

def split_file( filename, temporary_radix, rows_count, get_key ):
    """ Split file
        Split input file in several sorted files of rows_count lines.
    
        Parameters:
            filename: input full path name
            temporary_radix: full path radix to create splitted sorted files
            get_key: the function to extract sorting key from columns
        
        Return:
            number of generated files
    """
    # local variables
    file_counter = 0
    
    # write sub file
    def write_file( filename, rows ):
        rows.sort( key = get_key )
        with open( filename, 'w' ) as output_file:
            for r in rows:
                print( *r, sep = ';', end = '\n', file = output_file )
    
    # loop
    with open( filename, "r" ) as input_file:
        rows = []
        for line in input_file:
            line    = line.rstrip()
            columns = line.split( ';' )
            rows.append( columns )
            if len( rows ) == rows_count:
                write_file( temporary_radix % ( 0, file_counter ), rows )
                rows.clear()
                file_counter += 1
        if len( rows ) > 0:
            write_file( temporary_radix % ( 0, file_counter ), rows )
            file_counter += 1
    return file_counter

4. Fusionner pour gagner

La seconde partie de l’algorithme consiste à fusionner les fichiers 2 par 2 en veillant à conserver le tri. On va lire une ligne à la fois dans chaque fichier, les comparer et écrire la plus petite dans le fichier résultat puis remplacer la ligne écrite par la suivante du même fichier d’origine. Le risque c’est la famine (on a consommé toutes les lignes d’un des 2 fichiers mais pas celle de l’autre), il faudra faire évoluer la fonction de comparaison pour être capable de gérer des lignes vides : on oriente la ligne (bien définie) vers le fichier de sortie.

def fusion( temporary_radix, depth, id1, id2, new_id, get_key ):
    """ File fusion
        F(id1) + F(id2) -> F(new_id)
    
        Parameters:
            temporary_radix: full path radix to read/write temporary files
            depth: depth
            id1: id of 1st file ("%s\\temp_%d_%d.csv" % ( temporary_path, depth, id1 ))
            id2: id of 2nd file ("%s\\temp_%d_%d.csv" % ( temporary_path, depth, id2 ))
            new_id: id of the new file ("%s\\temp_%d_%d.csv" % ( temporary_path, depth+1, new_id ))
            get_key: function to extract the columns from the line
    """
    # local variables
    columns1 = None
    columns2 = None
    name1    = temporary_radix % (     depth,    id1 )
    name2    = temporary_radix % (     depth,    id2 )
    name_o   = temporary_radix % ( depth + 1, new_id )
    
    # compare
    def compare():
        if None == columns1:
            return 1
        elif None == columns2 or get_key( columns1 ) < get_key( columns2 ):
            return -1
        else:
            return 1
    
    # open the files
    with open( name1,  "r" ) as inFile1, \
         open( name2,  "r" ) as inFile2, \
         open( name_o, "w" ) as outFile:

        while True:
            # read lines
            if not columns1:
                line = inFile1.readline().rstrip()
                if 0 != len( line ):
                    columns1 = line.split( ';' )
            if not columns2:
                line = inFile2.readline().rstrip()
                if 0 != len( line ):
                    columns2 = line.split( ';' )
            if not columns1 and not columns2:
                break

            # compare
            if compare() < 0:
                print( *columns1, sep = ';', end = '\n', file = outFile )
                columns1 = None
            else:
                print( *columns2, sep = ';', end = '\n', file = outFile )
                columns2 = None

    # finalize
    unlink( name1 )
    unlink( name2 )

5. Un tri sinon rien

L’objectif de cette fonction est d’organiser le découpage puis les fusions jusqu’à obtenir un fichier trié. Une fois découpé, on va fusionner les fichiers 2 par 2 : à chaque étape on divise le nombre de fichiers par 2. Il faut donc poursuivre jusqu’à ce qu’il n’y ait plus qu’un fichier. Et dans le cas où à une étape il y a un nombre impair de fichiers, il y aura un fichier à recopier de l’étape i à i+1.

def sort_file( input_filename, \
               sorted_filename, \
               rows_count, \
               temporary_radix = "C:\\Temp\\temp_%d_%d.csv", \
               get_key         = lambda c: c ):
    """Sort file
    
       Parameters:
           input_filename: input file name (file to sort)
           sorted_filename: output file name (sorted file)
           temporary_radix: full path radix name for temporary files ({path}\\temp_{depth}_{id}.csv)
           rows_count: number of rows per file in splitting phasis
           get_key: key function to sort
    """
    # initialize
    count   = split_file( input_filename, temporary_radix, rows_count, get_key )
    depth   = 0
    
    # main loop: keep 2 files, join them into one sorted file until all files are consumed
    while 1 < count:
        n = 0
        for i in range( 0, count, 2 ):
            j = i + 1
            if count == j:
                rename( temporary_radix % (     depth, i ), \
                        temporary_radix % ( depth + 1, n ) )
            else:
                fusion( temporary_radix, depth, i, j, n, get_key )
            n += 1

        depth += 1
        count  = n
    
    # rename temporary file into attempted file
    rename( temporary_radix % ( depth, 0 ), sorted_filename )

Voici un exemple d’appel de la fonction de tri :

get_key = lambda c: (c[1],)
sort_file( "C:\\personnel\\python\\blog\\compare_csv\\sample1.csv", \
           "C:\\personnel\\python\\blog\\compare_csv\\sample1_sorted.csv", \
           10000, \
           "C:\\personnel\\python\\blog\\compare_csv\\temp\\temp_%d_%d.csv", \
           get_key )

6. Conclusion

La méthode permet de remplacer des problématiques d’allocation/gestion mémoire par des accès fichiers. La performance dépendra donc de 2 paramètres :

  • Le temps d’accès au disque :
    évitez d’utiliser un lecteur réseau pour gérer les fichiers temporaires, préférez un disque dur local.
  • Le nombre de lignes lors du découpage :
    Moins le nombre est élevé, plus le nombre de fichiers est important et donc plus l’accès disque sera coûteux.

Il faut calibrer l’algorithme en fonction de la machine utilisée (quantité de mémoire et disque dur) pour exécuter l’opération et le format du fichier CSV. Cela permettra de faire un choix efficace du nombre de lignes pour le découpage.