SqlBulkcopy implementation with transaction

Topics: Data Access Application Block, General discussion
Nov 23, 2011 at 11:13 AM

I have an implementation where I am inserting records into database via table value parameter.But in table there is certain limit of number of records to be inserted.

So i now wanted to insert into the table directly via sqlbulkcopy but with in transaction.

My current implementation is as follows

 private static bool ImportLevelTwoValidatedJobAsInsert(MailDATLists objMailDatLists)
        {
            bool isAdded = false;
            DbTransaction dbTrans = null;
            DbConnection dbConn = null;
            DbCommand objCMD = null;
            SqlDatabase objDB = null;
            try
            {
                objDB = DatabaseFactory.CreateDatabase() as SqlDatabase;
                dbConn = objDB.CreateConnection();
                dbConn.Open();
                dbTrans = dbConn.BeginTransaction();
                objCMD = objDB.GetStoredProcCommand("uspAddHeaderRecord");
                objCMD.Connection = dbTrans.Connection;
                objCMD.CommandTimeout = 900;
                isAdded = HeaderRecordDAL.GetInstance.AddHeaderRecords(objDB, objCMD, dbTrans, objMailDatLists);
                if (isAdded)
                {
                    isAdded = AddPackageQuantityRecordsBulk(objDB, objCMD, dbTrans, objMailDatLists);
                    ///............
                }

                if (!isAdded)
                {
                    dbTrans.Rollback();
                }
                else
                {
                    dbTrans.Commit();
                }
            }
            catch (Exception ex)
            {
                LoggingHandler.Log(ex);
                if (dbTrans != null)
                {
                    dbTrans.Rollback();
                }
                throw;
            }
            finally
            {
                if (objCMD != null)
                {
                    objCMD.Dispose();
                    objCMD = null;
                }
                if (dbConn != null)
                {
                    dbConn.Close();
                    dbConn.Dispose();
                    dbConn = null;
                }
                dbTrans = null;
                objDB = null;
            }
            return isAdded;
        }

        public bool AddHeaderRecords(Database objDB, DbCommand objCMD, DbTransaction dbTrans, MailDATLists objMailDatLists)
        {
            bool isAdded = false;
            HeaderRecord objHDR = objMailDatLists.GetHdrRecord();
            objHDR = HeaderRecord.GetHDRInfoFromFlatFileObject(obj);
            objDB.AddInParameter(objCMD, "@hdrOriginalFlatFileJobID", DbType.String, objHDR.hdrOriginalFlatFileJobID);
            objDB.AddInParameter(objCMD, "@hdrCreatedBy", DbType.Int32, objHDR.hdrCreatedBy);
            objDB.AddInParameter(objCMD, "@hdrBranchID", DbType.Int32, objMailDatLists.objJobInfoXml.BranchID);
            objDB.AddOutParameter(objCMD, "@ErrorVar", DbType.Int32, 1);
            objDB.ExecuteNonQuery(objCMD, dbTrans);
            if (!objCMD.Parameters["@ErrorVar"].Value.ToString().Equals("-1"))
            {
                isAdded = true;
            }
            else
            {
                isAdded = false;
            }
            objCMD.Parameters.Clear();
            return isAdded;
        }

        public bool AddPackageQuantityRecordsBulk(SqlDatabase objDB, DbCommand objCMD, DbTransaction dbTrans, MailDATLists objMailDatLists)
        {
            bool isAdded = false;
            bool isErrorInLoop = false;
            DateTime dtCurrent = DateTime.Now;
            int iRecordCount = 0;
            DataTable dt = this.ToDataTableFromClass<PackageQuantityRecord>();

            objCMD = objDB.GetStoredProcCommand("uspAddPackageQuantityRecordViaBulk");
            PackageQuantityRecord objPQT = null;

            foreach (PQT obj in NineOneParser.GetPQTRecordList(objMailDatLists.LevelOneValidatedFilePath, objMailDatLists.ObjImportJobInfo))
            {
                objPQT = PackageQuantityRecord.GetPQTInfoFromFlatFileObject(obj);
                iRecordCount++;
                if (iRecordCount > BulkDMLConfiguration.PQTBulkInsertLimit)
                {
                    iRecordCount = 0;

                    objDB.AddInParameter(objCMD, "@tblPQTDataTable", SqlDbType.Structured, dt);
                    objDB.AddOutParameter(objCMD, "@ErrorVar", DbType.Int32, 1);
                    objDB.ExecuteNonQuery(objCMD, dbTrans);
                    if (!objCMD.Parameters["@ErrorVar"].Value.ToString().Equals("-1"))
                    {
                        isAdded = true;
                    }
                    else
                    {
                        isAdded = false;
                        isErrorInLoop = true;
                        break;
                    }
                    objCMD.Parameters.Clear();
                    dt.Clear();
                    dt.AcceptChanges();
                }

                PackageQuantityRecord.SetImportJobInfo(objPQT, objMailDatLists.ObjImportJobInfo);
                DataRow dr = dt.NewRow();
                dr["fkJobID"] = objPQT.fkJobID;
                dr["fkCQTDatabaseID"] = objPQT.fkCQTDatabaseID;
                dr["pqtPackageID"] = objPQT.pqtPackageID;
                dr["pqtCreatedDate"] = dtCurrent;
                dr["pqtPackageZipCode"] = objPQT.pqtPackageZipCode;
                dr["pqtPackageLevel"] = objPQT.pqtPackageLevel;
                dr["pqtNumberofCopies"] = objPQT.pqtNumberofCopies;
                dr["pqtNumberofPieces"] = objPQT.pqtNumberofPieces;
                dr["pqtPQTRecordStatus"] = objPQT.pqtPQTRecordStatus;
                dr["pqtPackageCarrierRoute"] = objPQT.pqtPackageCarrierRoute;
                dr["pqtPackageBarcode"] = objPQT.pqtPackageBarcode;
                dr["pqtPackageStatus"] = objPQT.pqtPackageStatus;
                if (objPQT.pqtBundleChargeAllocation.HasValue)
                {
                    dr["pqtBundleChargeAllocation"] = objPQT.pqtBundleChargeAllocation;
                }
                //11-1 Field:
                dr["pqtUniqueCharacteristics"] = objPQT.pqtUniqueCharacteristics;
                //---------------------
                if (objPQT.pqtRecordRowNumber.HasValue)
                {
                    dr["pqtRecordRowNumber"] = objPQT.pqtRecordRowNumber;
                }
                dr["pqtCreatedBy"] = objPQT.pqtCreatedBy;
                dr["pqtIsDeleted"] = false;
                dt.Rows.Add(dr);
                objPQT = null;
            }

            if (!isErrorInLoop && iRecordCount > 0)
            {
                objDB.AddInParameter(objCMD, "@tblPQTDataTable", SqlDbType.Structured, dt);
                objDB.AddOutParameter(objCMD, "@ErrorVar", DbType.Int32, 1);
                objDB.ExecuteNonQuery(objCMD, dbTrans);
                if (!objCMD.Parameters["@ErrorVar"].Value.ToString().Equals("-1"))
                {
                    isAdded = true;
                }
                else
                {
                    isAdded = false;
                }
                objCMD.Parameters.Clear();
            }
            return isAdded;
        }
I have to do changes in the AddPackageQuantityRecordsBulk function

Nov 23, 2011 at 3:35 PM

My current limit in table value parameter is 20000.(more then that didn't work in my table value parameter)

I am trying to increase this limit to let say 200,000 and use it via sql bulkcopy

Nov 23, 2011 at 11:49 PM

FYI: I just ran a quick table valued parameter test with over 300000 records and it worked fine.

In terms of SqlBulkCopy, Here's information on using SqlBulkCopy with Enterprise Library.  If you want to use your own transaction, just pass it in to the constructor.

For example:

var db = EnterpriseLibraryContainer.Current.GetInstance<Database>() as SqlDatabase;

using (var conn = db.CreateConnection())
{
    conn.Open();

    var tx = conn.BeginTransaction() as SqlTransaction;
    var bulkCopy = new SqlBulkCopy(conn as SqlConnection, SqlBulkCopyOptions.Default, tx);

    foreach(DataColumn col in dt.Columns )
    {
        bulkCopy.ColumnMappings.Add(col.ColumnName, col.ColumnName);
    }

    bulkCopy.DestinationTableName = "dbo.Category";                    
    bulkCopy.WriteToServer(dt);

    tx.Commit();
}

 

--
Randy Levy
Enterprise Library support engineer
entlib.support@live.com

Nov 24, 2011 at 6:06 AM

Thanks randylevy

As i am inserting in loop could it be possible to see is the run is successful or not

like

isAdded=  bulkCopy.WriteToServer(dt);

also I will call commit in the main transaction method
ImportLevelTwoValidatedJobAsInsert
Nov 24, 2011 at 6:17 AM

WriteToServer does not return a value so you should expect any failures to appear as exceptions.  
You can commit your transaction in the main method -- you already are passing the transaction into AddPackageQuantityRecordsBulk.

--
Randy Levy
Enterprise Library support engineer
entlib.support@live.com

Nov 24, 2011 at 7:47 AM

Ok