SQL Server – Most Efficient Way to Load a CSV File into SQL Server 2016

ccsvsql serversql-server-2016

Alright, so this question seems simple up-front. Well, there's a bit more to it than just loading the file.

The entire story goes like this:
For some reason, our client sends us what can only be described as a relational database, flattened, condensed into a single csv file (but the delimiter is a tilde instead of a comma). It's kinda terrible, actually; the same data gets repeated ad infinitum throughout the entire file. So, in order to bring some order back into this data, I load it into an actual relational database. Because of the volume of data, loading it into a database makes it easier to inspect the data for issues. And it also makes it much easier to export.

There are 53 rows per record and somewhere in the ballpark of 250,000 records per transmission. I want to split that into 6 normalized tables. I'm not sure whether to validate the data in the C# program or the SQL Server 2016 LocalDb instance I'm using.

I'm not an experienced DBA; I'm a C# programmer who has dabbled in SQL a little. I feel comfortable enough with the syntax, but I want to make sure I'm doing this right.

Also, everything has to be completely automated. The file comes in, the C# program starts when it receives a file, and loads it into the database.

Let me explain the layout more. The file is 53 fields, and each line contains a detail line for a statement (what they are being charged for, how frequently they were charged for the item, total cost or credit for that item, etc). The problem is that EVERY line has the information for an entire mailing, the payer, resident, property, and remittance. With that being known, let me explain how I'm doing this now:

  1. Open the file
  2. For each line of the file, retrieve the keys for the tables that describe the mailing, payer, resident, property and remittance destinations.
  3. Compare that data with the cached ones. If the cached data is invalid, query the DB to see if that entity has already been added. If not, create it. Cache that.
  4. Add the new detail line and relate it to the mailing, which has a one to many relationship with details. (The mailing itself has a many to one relation to the payer, property, and remittance.)
  5. Close the file when finished.

It's not the slowest thing in the world, but it's all being done completely in RAM. Since the program comes dangerously close to running out of RAM in its current state, we've decided to load the data into a database instead of keeping it all in local RAM. Hopefully that sheds some more light on this for potential answerers. Thank you!

Best Answer

My preferred approach to pass in multiple rows of multiple fields (i.e. not a simple delimited list) is to use Table-Valued Parameters (TVPs). The idea is that you will read the file line by line in your .NET code but stream the data either all at once, or if that is too much for one transaction, broken up into batches, into SQL Server using a TVP. The TVP is essentially a table variable that is an input parameter to a stored procedure, hence this will be a set-based operation (as far as SQL Server is concerned), not row-by-row. Technically a stored procedure is not required since a TVP can be sent as a parameter to an ad hoc query, but working with a stored procedure is just a better approach anyway.

There are two main patterns for using a TVP while reading from a file (and each of them relies upon passing in a method that returns IEnumerable<SqlDataRecord> rather than a DataTable):

  1. Execute the "import" Stored Procedure, which initiates the file open and reading. All rows are read (and can be validated at this point) and streamed into the Stored Procedure. In this approach, the Stored Procedure is executed only once, and all rows are sent in as a single set. This is the easier approach, but for larger data sets (i.e. millions of rows) it might not perform the best if the operation is to merge the data directly into a live table rather than simply loading into a staging table. The memory required for this approach is the size of 1 record.

  2. Create a variable for int _BatchSize, open the file, and either:

    1. Create a collection to hold the batch of records
      1. Loop for _BatchSize or until no more lines to read from the file
        1. Read a line
        2. Validate
        3. Store valid entry in the collection
      2. Execute the Stored Procedure at the end of each loop, streaming in the collection.
      3. The memory required for this approach is the size of 1 record * _BatchSize.
      4. The benefit is that the transaction in the DB does not depend on any disk I/O latency or business logic latency.
    2. Loop to execute the Stored Procedure until no more lines to read from the file
      1. Execute the Stored Procedure
        1. Loop for _BatchSize or until no more lines to read from the file
          1. Read a line
          2. Validate
          3. Stream record into SQL Server
      2. The memory required for this approach is the size of 1 record.
      3. The downside is that the transaction in the DB depends on disk I/O latency and/or business logic latency, hence could be open longer, hence more potential for blocking.

I have a full example of pattern #1 over on StackOverflow in the following answer: How can I insert 10 million records in the shortest time possible?

For pattern #2.1 (a highly scalable approach), I have a partial example below:

Required database objects (using a contrived structure):

First, you need a User-Defined Table Type (UDTT).

Please note the use of UNIQUE, DEFAULT, and CHECK Constraints to enforce data integrity before the records even hit SQL Server. The Unique Constraint is also how you create an index on a Table Variable :).

CREATE TYPE [ImportStructure] AS TABLE
(
    BatchRecordID INT IDENTITY(1, 1) NOT NULL,
    Name NVARCHAR(200) NOT NULL,
    SKU VARCHAR(50) NOT NULL UNIQUE,
    LaunchDate DATETIME NULL,
    Quantity INT NOT NULL DEFAULT (0),
    CHECK ([Quantity] >= 0)
);
GO

Next, use the UDTT as an input param to an import stored procedure (hence "Tabled-Valued Parameter").

CREATE PROCEDURE dbo.ImportData (
   @CustomerID     INT,
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

UPDATE prod
SET    prod.[Name] = imp.[Name],
       prod.[LaunchDate] = imp.[LaunchDate],
       prod.[Quantity] = imp.[Quantity]
FROM   [Inventory].[Products] prod
INNER JOIN @ImportTable imp
        ON imp.[SKU] = prod.[SKU]
WHERE  prod.CustomerID = @CustomerID;

INSERT INTO [Inventory].[Products] ([CustomerID], [SKU], [Name], [LaunchDate], [Quantity])
    SELECT  @CustomerID, [SKU], [Name], [LaunchDate], [Quantity]
    FROM    @ImportTable imp
    WHERE   NOT EXISTS (SELECT prod.[SKU]
                        FROM   [Inventory].[Products] prod
                        WHERE  prod.[SKU] = imp.[SKU]
                       );
GO

App Code:

First we will define the class that will be used to store the batch of records:

using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private class ImportBatch
{
  string Name;
  string SKU;
  DateTime LaunchDate;
  int Quantity;
}

Next we define the method used to stream the data from the collection into SQL Server. Please note:

  • The SqlMetaData entry for BatchRecordID is defined even though it is an IDENTITY field. However, it is defined in such a way as to indicate that the value is server-generated.
  • The yield return returns the record but then control comes right back to the next line (which just goes back to the top of the loop).
private static IEnumerable<SqlDataRecord> SendImportBatch(List<ImportBatch> RecordsToSend)
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("BatchRecordID", SqlDbType.Int, true, false,
                      SortOrder.Unspecified, -1),
      new SqlMetaData("Name", SqlDbType.NVarChar, 200),
      new SqlMetaData("SKU", SqlDbType.VarChar, 50),
      new SqlMetaData("LaunchDate", SqlDbType.DateTime),
      new SqlMetaData("Quantity", SqlDbType.Int)
   };

   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);

   // Stream the collection into SQL Server without first
   // copying it into a DataTable.
   foreach (ImportBatch _RecordToSend in RecordsToSend)
   {
      // we don't set field 0 as that is the IDENTITY field
      _DataRecord.SetString(1, _RecordToSend.Name);
      _DataRecord.SetString(2, _RecordToSend.SKU);
      _DataRecord.SetDateTime(3, _RecordToSend.LaunchDate);
      _DataRecord.SetInt32(4, _RecordToSend.Quantity);

      yield return _DataRecord;
   }
}

Finally, we define the overall import processing operation. It opens the Connection to SQL Server and the File, then it cycles through the file reading and validating the _BatchSize number of records for each cycle. The stored procedure parameters are only defined once as they do not change: the CustomerID value doesn't change, and the TVP parameter value is merely a reference to a method -- SendImportBatch -- that only gets invoked when the stored procedure is executed via ExecuteNonQuery. The input parameter to the method passed in as the TVP value is a reference type so it should always reflect the current value of that variable / object.

public static void ProcessImport(int CustomerID)
{
   int _BatchSize = GetBatchSize();
   string _ImportFilePath = GetImportFileForCustomer(CustomerID);

   List<ImportBatch> _CurrentBatch = new List<ImportBatch>();
   ImportBatch _CurrentRecord;

   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   // Parameters do not require leading "@" when using CommandType.StoredProcedure
   SqlParameter _ParamCustomerID = new SqlParameter("CustomerID", SqlDbType.Int);
   _ParamCustomerID.Value = CustomerID;
   _Command.Parameters.Add(_ParamCustomerID);

   SqlParameter _ParamImportTbl = new SqlParameter("ImportTable", SqlDbType.Structured);
   // TypeName is not needed when using CommandType.StoredProcedure
   //_ParamImportTbl.TypeName = "dbo.ImportStructure";
   // Parameter value is method that returns streamed data (IEnumerable)
   _ParamImportTbl.Value = SendImportBatch(_CurrentBatch);
   _Command.Parameters.Add(_ParamImportTbl);

   StreamReader _FileReader = null;

   try
   {
      int _RecordCount;
      string[] _InputLine = new string[4];

      _Connection.Open();

      _FileReader = new StreamReader(_ImportFilePath);

       // process the file
       while (!_FileReader.EndOfStream)
       {
          _RecordCount = 1;

          // process a batch
          while (_RecordCount <= _BatchSize
                  && !_FileReader.EndOfStream)
          {
             _CurrentRecord = new ImportBatch();

             _InputLine = _FileReader.ReadLine().Split(new char[]{','});

             _CurrentRecord.Name = _InputLine[0];
             _CurrentRecord.SKU = _InputLine[1];
             _CurrentRecord.LaunchDate = DateTime.Parse(_InputLine[2]);
             _CurrentRecord.Quantity = Int32.Parse(_InputLine[3]);

             // Do validations, transformations, etc
             if (record is not valid)
             {
                _CurrentRecord = null;
                continue; // skip to next line in the file
             }

             _CurrentBatch.Add(_CurrentRecord);
             _RecordCount++; // only increment for valid records
          }

          _Command.ExecuteNonQuery(); // send batch to SQL Server

          _CurrentBatch.Clear();
       }
   }
   finally
   {
      _FileReader.Close();

      _Connection.Close();
   }

   return;
}

If the READONLY nature of the TVP prohibits some validation and/or transformation that is needed before being merged into the destination table, then the data in the TVP can be easily transferred to a local temporary table at the beginning of the Stored Procedure.