Sql-server – T-SQL: CSV->table pipeline with custom parsed numeric data, lookup values

csvimportsql serversql-clrsql-server-2008

Problem

Wish to insert thousands of records into a table [Child] from a CSV file (and do this from tens of files into the same table). However, I want to A. substitute a lookup value into the table (column values from [Parent]) and B. perform other computations on CSV-included data before the insert is made. How is this best accomplished?

The lookup is basically that I wish to return an IDENTITY-based primary key value (as Child.P_ID) from [Parent] where [Parent].NVarCharValue,[Parent].DateTimeValue match the first two fields from the CSV data, but I would rather not bother storing NVarCharValue and DateTimeValue themselves into [Child], as future joins with other child-level tables will be on the P_ID field and this text info is redundant and would chew up space and perhaps cause additional lag time on insertions (although perhaps not as much as the lookups will be…but alas…).

For example of computationally-derived data (B.), I’d like to take encoded GPS coordinates in [d]ddmm.mmmm,C (DegreesDecimalMinutes,Direction) format and turn it into a single decimal degrees field (or half of a geography::Point). For example, my CSV contains such lovely entries as 3748.9729,S, which should be parsed as 37° 48.9729’ S, then translate that to -37.816215 which would be stored for Latitude.

Options? I came up with 5 possibilities, but I don’t want to jump into coding any one of them unless I’ve considered all the pitfalls of an approach.


Option 1: Basic Insert Into view + BULK INSERT

Even though I intend to only insert into the child table, the multi-table join needed to do the lookup probably violates the MS constraint on inserting a row into multiple underlying tables. It certainly would violate the constraint on no computations or aggregations specified in a VIEW’s DDL if inserting into that view.

Otherwise, this is (essentially) what I’d like to do

CREATE VIEW [dbo].[ivChild] as 
SELECT 
  P.[ NVarCharValue],
  P.[ DateTimeValue],
  CAST(CAST((ABS(C.Latitude) as Integer) as VarChar(3))+STR(ABS(C.Latitude-CAST(C.Latitude as Integer))*60.0,9,6) as LatitudeMagnitude --not quite correct (single digit minutes), but close enough for the example
CASE
  WHEN SIGN(C.Latitude)>0 THEN ‘N’
  ELSE ‘S’
END as LatitudeDir,
CAST(CAST((ABS(C.Longitude) as Integer) as VarChar(3))+STR(ABS(C.Longitude-CAST(C.Longitude as Integer))*60.0,9,6) as LongitudeMagnitude
CASE
  WHEN SIGN(C.Latitude)>0 THEN ‘E’
  ELSE ‘W’
END as LongitudeDir
FROM Child as C join Parent as P on P.ID=C.P_id

But maybe I can have a subquery for the first two to have the insert take effect only on Child?
e.g.

SELECT
  (SELECT P.NVarCharValue from Parent as P on P.ID=C.P_id) as NVarCharValue
  (SELECT P.DateTimeValue from Parent as P on P.ID=C.P_id) as DateTimeValue
    …

Option 2: BULK INSERT into Staging Table, Stored procedure merge into actual data table?

Insert the NVarCharValue + DateTimeValue columns from the CSV into the staging table, then do either a MERGE INTO or INSERT INTO query to migrate the values to the data table, performing computations in-line (if possible) to perform the lookup and parse the string input values to final stored value (e.g. decimal degrees from string and then stored as decimal(10,6)).

Option 3: Create custom CLR for GPSMagnitude

which would parse the incoming string in C#, store it internally to the class as a decimal number, and have an auto-convert to decimal datatype function. Something like

public static implicit operator Decimal(GPSMagnitude c) # GPS->decimal
    { return c._decimalMagnitude; }

Public static implicit operator GPSMagnitude(Decimal c) # decimal->GPS
    { return new GPSMagnitude(c); }
Public static implicit operator ToString(GPSMagnitude c)# GPS->string
    {
      d = (int)c;
      m = (c-(int)c)*60.0;
      return d.ToString()+m.ToString();
    }
[SqlMethod(OnNullCall = false)]
public GPSMagnitude Parse(SqlString s)           # string->GPS
    {
      Decimal f;
      int Deg;
      float Mn;
      Int32.TryParse(s, out f);
      Deg = ((int)f)/100
      Mn = f – Deg*100.0
      return new GPSMagnitude(Deg + Mn/60)
    }

Then use a view with CAST(LatitudeMagnitude as GPSMagnitude) specified and have an underlying table with LatitudeMagnitude specified as decimal(10,6). I’d still have to do some funny business to get merge two columns to get the magnitude signed (S/W are negative), as well as I would have to independently deal with the P_ID lookup issue.

Option 4: BULK INSERT with FIRE_TRIGGERS

Do the parsing in a custom trigger, which will take a string input and send it to the table as a decimal. Slows down BULK INSERT considerably from what I hear, but I should be able to both do a lookup and parse the GPS text. I haven’t given a whole lot of thought to what the TRIGGER code would be.

Option 5: Pre-parse my CSV

in e.g. Python to do the GPS calculations, combining the magnitude and sign columns. I’d still have to deal with the P_ID lookup issue though

Option 6: Your solution to my ugly input dataset.

I can flesh out more of these options if you're confused by what I have:P


Based on Solomon's suggestion, I'm beginning to implement a C# stream-parser.

To grab the existing structure off a table, the following code should work (with a few tweaks possibly). Note that this uses else ifs rather than switch case as it seems more code-compact (not so many break statements needed, etc). I'm not sure which would be more efficient. I also haven't grabbed of all the columns to cover all contingencies, but for the most basic types it should work okay. Again…this is just for approximating the table fields you already have set up in SQL server.

    public SqlMetaData[] ServerLookupFields()
    {
        using (SqlConnection conn = new SqlConnection(ParserDict.ConnectionString))
        {
            SqlCommand command;
            int siz;
            Console.WriteLine("Activating SQL connection");
            conn.Open();
            Console.WriteLine("SQL connection open");
            string num = "select count(*) as cnt from sys.columns as sc join sys.tables as st on sc.object_id = st.object_id where st.name=@table_name;";
            command = new SqlCommand(num, conn);
            command.Parameters.AddWithValue("@table_name", this.tableDestination);
            string result = command.ExecuteScalar().ToString();
            Int32.TryParse(result, out siz);
            command.Dispose();
            SqlMetaData[] smd = new SqlMetaData[siz];
            string cmd = "exec sp_columns @table_name=@table_name, @table_owner=dbo";
            command = new SqlCommand(cmd, conn);
            command.Parameters.AddWithValue("@table_name", this.tableDestination);
            SqlDataReader rd = command.ExecuteReader();
            int i = 0;
            while (rd.Read())
            {
                smd[i] = ReadSingleMetaRow((IDataRecord)rd);
                Console.WriteLine(smd[i].Name.ToString()+","+smd[i].DbType.ToString());
                i++;
            }
            rd.Close();
            command.Dispose();
            return smd;
        }
    }
    private SqlMetaData ReadSingleMetaRow(IDataRecord meta)
    {
        bool nullable;
        byte precision;
        byte length;
        byte scale;
        SqlMetaData smd;
        string columnName = meta[3].ToString(); //Column_name
        string dataType = meta[5].ToString(); //Type_name
        string dt_fw_lower = dataType.ToLower().Split(new Char[] { ' ' }, 2)[0]; //dataType_firstWord_ToLower
        byte.TryParse(meta[6].ToString(), out precision); //Precision
        byte.TryParse(meta[7].ToString(), out length); //Length
        byte.TryParse(meta[8].ToString(), out scale); //Scale
        Boolean.TryParse(meta[10].ToString(), out nullable); //Nullable
        string datetimesub = meta[14].ToString(); //DateTimeSub(type)
        if (dt_fw_lower == "datetime2") { smd = new SqlMetaData(columnName, SqlDbType.DateTime2); }
        else if (dt_fw_lower == "nvarchar") { smd = new SqlMetaData(columnName, SqlDbType.NVarChar, length); }
        else if (dt_fw_lower == "varchar") { smd = new SqlMetaData(columnName, SqlDbType.VarChar, length); }
        else if (dt_fw_lower == "bigint") { smd = new SqlMetaData(columnName, SqlDbType.BigInt); }
        else if (dt_fw_lower == "float") { smd = new SqlMetaData(columnName, SqlDbType.Float, precision, length); }
        else if (dt_fw_lower == "decimal") { smd = new SqlMetaData(columnName, SqlDbType.Decimal, precision, scale); }
        else if (dt_fw_lower == "binary") { smd = new SqlMetaData(columnName, SqlDbType.Binary, length); }
        else if (dt_fw_lower == "varbinary") { smd = new SqlMetaData(columnName, SqlDbType.VarBinary, length); }
        else if (dt_fw_lower == "bit") { smd = new SqlMetaData(columnName, SqlDbType.Bit); }
        else if (dt_fw_lower == "char") { smd = new SqlMetaData(columnName, SqlDbType.Char, length); }
        else if (dt_fw_lower == "nchar") { smd = new SqlMetaData(columnName, SqlDbType.NChar, length); }
        else if (dt_fw_lower == "datetimeoffset") { smd = new SqlMetaData(columnName, SqlDbType.DateTimeOffset); }
        else if (dt_fw_lower == "datetime") { smd = new SqlMetaData(columnName, SqlDbType.DateTime); }
        else if (dt_fw_lower == "date") { smd = new SqlMetaData(columnName, SqlDbType.Date, length); }
        else if (dt_fw_lower == "image") { smd = new SqlMetaData(columnName, SqlDbType.Image, length); }
        else if (dt_fw_lower == "int") { smd = new SqlMetaData(columnName, SqlDbType.Int); }
        else if (dt_fw_lower == "money") { smd = new SqlMetaData(columnName, SqlDbType.Money, length); }
        else if (dt_fw_lower == "ntext") { smd = new SqlMetaData(columnName, SqlDbType.NText, length); }
        else if (dt_fw_lower == "real") { smd = new SqlMetaData(columnName, SqlDbType.Real, precision, scale); }
        else if (dt_fw_lower == "smalldatetime") { smd = new SqlMetaData(columnName, SqlDbType.SmallDateTime); }
        else if (dt_fw_lower == "smallint") { smd = new SqlMetaData(columnName, SqlDbType.SmallInt); }
        else if (dt_fw_lower == "smallmoney") { smd = new SqlMetaData(columnName, SqlDbType.SmallMoney); }
        else if (dt_fw_lower == "structured") { smd = new SqlMetaData(columnName, SqlDbType.Structured, length); }
        else if (dt_fw_lower == "text") { smd = new SqlMetaData(columnName, SqlDbType.Text, -1); }
        else if (dt_fw_lower == "timestamp") { smd = new SqlMetaData(columnName, SqlDbType.Timestamp); }
        else if (dt_fw_lower == "time") { smd = new SqlMetaData(columnName, SqlDbType.Time); }
        else if (dt_fw_lower == "tinyint") { smd = new SqlMetaData(columnName, SqlDbType.TinyInt); }
        else if (dt_fw_lower == "udt") { smd = new SqlMetaData(columnName, SqlDbType.Udt, length); }
        else if (dt_fw_lower == "uniqueidentifier") { smd = new SqlMetaData(columnName, SqlDbType.UniqueIdentifier, length); }
        else if (dt_fw_lower == "variant") { smd = new SqlMetaData(columnName, SqlDbType.Variant, length); }
        else if (dt_fw_lower == "xml") { smd = new SqlMetaData(columnName, SqlDbType.Xml, length); }
        else { smd = new SqlMetaData(columnName, 0); }
        return smd;
    }

Best Answer

You don't have to do this in pieces. You can do all of this inline, in a set-based approach, in batches (so that the size of the input file is irrelevant). Just think in terms of how you would take a row from the file and process it from start to finish. Then, make sure that each step is operating on a set of rows instead of a single row.

This won't require any SQLCLR, but will use either XML or a Table-Valued Parameter (TVP) to transport the batch of rows to SQL Server after it has been processed locally.

The basic workflow is as follows (I have this documented in another answer but can't find it at the moment). It requires a User-Defined Table Type be created to transport each row to the Stored Procedure (this is the TVP and is available as of SQL Server 2008, which you have).

  1. Open the file
  2. Loop over N rows (a batch) until no rows left
    1. Perform validations
    2. Convert encoded GPS coordinates to decimal format
    3. Send the row (using a method that implements IEnumerable<SqlDataRecord>, not a DataTable !) to the stored procedure:
      1. create a local temporary table with the TVP columns plus columns to hold any generated IDENTITY values.
      2. populate the temp table from the TVP
      3. Insert into [Parent] where [Parent].NVarCharValue and [Parent].DateTimeValue do not exist. You can either capture IDENTITY values along with [Parent].NVarCharValue and [Parent].DateTimeValue into another temp table to then update the main temp table with so that it can get the new IDENTITY values, or you might be able to skip that and just JOIN to the [Parent] table when doing the final INSERT.
      4. If you captured new IDENTITY values for [Parent] into a second temp table, UPDATE the main temp table with those values, matching on NVarCharValue and DateTimeValue.
      5. If appropriate, UPDATE the [Child] table from the main temp table, JOINing on one or more columns that uniquely identify the rows (i.e. an Alternate Key). This is effectively a WHERE EXISTS.
      6. INSERT INTO [Child] from the main temp table, using WHERE NOT EXISTS on whatever column(s) uniquely identify the rows
  3. Close the file

I have used this approach several times and with great success. It is fault-tolerant so it will easily resume where it left off if interrupted. And, it doesn't require any staging tables (which need to be cleaned out and get messy if you are multi-threading the import) as the local temporary table plays that role and is automatically cleaned up and is inherently thread-safe :-). This also lets you process pieces in the app layer that are most appropriate to handle there, and process the rest in the data layer. AND, as far as memory is concerned, you only ever have 1 batch worth of import file in memory at any given time, so it won't matter if the import file is 10 kB or 10 GB.

Also, you can get fancier with the process on the DB side if you like. I typically have a status column to determine: Insert, Update, or Skip. Then, after populating the main temp table, I compare to the existing destination table. If the row is there but one or more columns are different, it gets marked as Update. If the row is there but all columns are the same, it gets marked as Skip (no need to UPDATE rows that aren't changing). Else, it gets marked as Insert. Then, at the end, I UPDATE where Status = Update, then I INSERT where Status = Insert. Of course, I was importing into several tables that each had their own "funky" rules ;-).

You can see some sample code in another answer of mine on Stack Overflow: How can I insert 10 million records in the shortest time possible? Please note that the structure of that code is not batched. It executes the stored procedure once, which initiates the process of opening the file, reading and sending rows, and finally closing. In order to make it batched, you would need to change it to be something along the lines of:

  1. Open file
  2. while (! file.EOF)
    1. execute Stored Procedure: "SendRows" method has input parameter for file handle
      1. "SendRows" method =
      2. while ( rowCount < batchSize ) // this is the "Loop over N rows" part of the workflow
        1. file.ReadLine()
        2. if ( file.EOF ) break;
        3. process and send row // Perform validations, Convert encoded GPS coordinates
        4. rowCount++;
  3. Close file