Export MS SQL tables to Parquet Files

The following code exports MS SQL tables to Parquet files via PySpark. It can be used in tables that do not have an indexed column with the numerical type (int, float, etc.).

import sys
from pyspark.sql.session import SparkSession


def get_spark(jdbc_driver_path):
    return SparkSession.builder.master("local[10]").config("spark.driver.extraClassPath", jdbc_driver_path).getOrCreate()


def get_sql_dataframe(host, database_name, table_name, order_by, upperBound, partition_num):
    return spark.read.format("jdbc")\
        .option("url", "jdbc:sqlserver://{host};databasename={database_name};IntegratedSecurity=true".format(host=host, database_name=database_name))\
        .option("dbtable", "(SELECT ROW_NUMBER() OVER (ORDER BY {order_by}) AS row_num, * FROM {table_name}) as tmp".format(order_by=order_by, table_name=table_name))\
        .option("partitionColumn", "row_num")\
        .option("lowerBound", 0)\
        .option("upperBound", upperBound)\
        .option("numPartitions", partition_num)\
        .load()\
        .drop('row_num')


def get_count(host, database_name, table_name):
    return spark.read.format("jdbc")\
        .option("url", "jdbc:sqlserver://{host};databasename={database_name};IntegratedSecurity=true".format(host=host, database_name=database_name))\
        .option("dbtable", "(SELECT COUNT(*) as row_count FROM {table_name}) as tmp".format(table_name=table_name))\
        .load()\
        .first()['row_count']


def export(sql_df, output_folder):
    sql_df.write.mode("overwrite").parquet(output_folder)


if __name__ == "__main__":
    # Database Configuration
    host = '[Host Name]'
    database_name = '[Database Name]'
    table_name = '[Table Name]'
    order_by = '[Indexed Column] DESC'
    jdbc_driver_path = 'mssql-jdbc-6.4.0.jre8.jar'

    # Output
    output_folder = '[Output Folder]'
    partition_num = 1000

    spark = get_spark(jdbc_driver_path)

    total_count = get_count(host, database_name, table_name)

    print str(total_count) + " rows will be exported."

    sql_df = get_sql_dataframe(host, database_name, table_name, order_by, total_count, partition_num)

    export(sql_df, output_folder)

Leave a Reply

Your email address will not be published. Required fields are marked *