I have written a class that behaves like the inbuilt SqlBulkCopy class for Postgres. It wraps the COPY
command to provide fast uploads. The method for IEnumerable looks like this (there is a similar one for DataTable
).
public void WriteToServer<T>(IEnumerable<T> data)
{
try
{
if (DestinationTableName == null || DestinationTableName == "")
{
throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
}
PropertyInfo[] properties = typeof(T).GetProperties();
int colCount = properties.Length;
NpgsqlDbType[] types = new NpgsqlDbType[colCount];
int[] lengths = new int[colCount];
string[] fieldNames = new string[colCount];
using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
{
using (var rdr = cmd.ExecuteReader())
{
if (rdr.FieldCount != colCount)
{
throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
}
var columns = rdr.GetColumnSchema();
for (int i = 0; i < colCount; i++)
{
types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
fieldNames[i] = columns[i].ColumnName;
}
}
}
var sB = new StringBuilder(fieldNames[0]);
for (int p = 1; p < colCount; p++)
{
sB.Append(", " + fieldNames[p]);
}
using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
{
foreach (var t in data)
{
writer.StartRow();
for (int i = 0; i < colCount; i++)
{
if (properties[i].GetValue(t) == null)
{
writer.WriteNull();
}
else
{
switch (types[i])
{
case NpgsqlDbType.Bigint:
writer.Write((long)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Bit:
if (lengths[i] > 1)
{
writer.Write((byte[])properties[i].GetValue(t), types[i]);
}
else
{
writer.Write((byte)properties[i].GetValue(t), types[i]);
}
break;
case NpgsqlDbType.Boolean:
writer.Write((bool)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Bytea:
writer.Write((byte[])properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Char:
if (properties[i].GetType() == typeof(string))
{
writer.Write((string)properties[i].GetValue(t), types[i]);
}
else if (properties[i].GetType() == typeof(Guid))
{
var value = properties[i].GetValue(t).ToString();
writer.Write(value, types[i]);
}
else if (lengths[i] > 1)
{
writer.Write((char[])properties[i].GetValue(t), types[i]);
}
else
{
var s = ((string)properties[i].GetValue(t).ToString()).ToCharArray();
writer.Write(s[0], types[i]);
}
break;
case NpgsqlDbType.Time:
case NpgsqlDbType.Timestamp:
case NpgsqlDbType.TimestampTz:
case NpgsqlDbType.Date:
writer.Write((DateTime)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Double:
writer.Write((double)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Integer:
try
{
if (properties[i].GetType() == typeof(int))
{
writer.Write((int)properties[i].GetValue(t), types[i]);
break;
}
else if (properties[i].GetType() == typeof(string))
{
var swap = Convert.ToInt32(properties[i].GetValue(t));
writer.Write((int)swap, types[i]);
break;
}
}
catch (Exception ex)
{
string sh = ex.Message;
}
writer.Write((object)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Interval:
writer.Write((TimeSpan)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Numeric:
case NpgsqlDbType.Money:
writer.Write((decimal)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Real:
writer.Write((Single)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Smallint:
try
{
if (properties[i].GetType() == typeof(byte))
{
var swap = Convert.ToInt16(properties[i].GetValue(t));
writer.Write((short)swap, types[i]);
break;
}
writer.Write((short)properties[i].GetValue(t), types[i]);
}
catch (Exception ex)
{
string ms = ex.Message;
}
break;
case NpgsqlDbType.Varchar:
case NpgsqlDbType.Text:
writer.Write((string)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Uuid:
writer.Write((Guid)properties[i].GetValue(t), types[i]);
break;
case NpgsqlDbType.Xml:
writer.Write((string)properties[i].GetValue(t), types[i]);
break;
}
}
}
}
writer.Complete();
}
}
catch (Exception ex)
{
throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer(). See inner exception for details", ex);
}
}
You need to set the Property DestinationTableName first and conn needs to be an open connection.
Essentially the method uses Reflection
to get the properties of the type of the passed list. Obviously the data types of the table being filled have to match! The writer builds up by iterating through the list, and then does one bulk insert at the end. I may not have dealt with all the types that you need, but it should be clear how to add them, where they are missing.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…