创建 SSIS 自定义数据流组件 + 2008年国际安全峰会
我是 SSIS 自定义组件的新手。刚刚开始编写一个组件,其中输入行数永远不会与输出行数相同。对于每个输入行,它会进行一些验证并生成需要映射到输出缓冲区的 n 行。
因此,在设计时验证编码之后,一切都很好。
我的运行时代码如下:
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
inputBufferColumnIndex = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
inputBufferColumnIndex[x] = BufferManager.FindColumnByLineageID (input.Buffer, column.LineageID);
}
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
outputBufferColumnIndex = new int[output.OutputColumnCollection.Count];
for (int x = 0; x < output.OutputColumnCollection.Count; x++)
{
IDTSOutputColumn100 outcol = output.OutputColumnCollection[x];
outputBufferColumnIndex[x] = BufferManager.FindColumnByLineageID(input.Buffer, outcol.LineageID);
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
if(!buffer.EndOfRowset)
{
while (buffer.NextRow())
{
var rec = new Record
{
Source = buffer[0].ToString(),
Nk = buffer[1].ToString(),
Guid = new Guid(buffer[2].ToString()),
FromDate = Convert.ToDateTime(buffer[3].ToString()),
ToDate = Convert.ToDateTime(buffer[4].ToString())
};
sourceRecords.Add(rec);
}
ProcessArray(sourceRecords,buffer);
}
}
public void ProcessArray(List<Record> records, PipelineBuffer buffer)
{
//Get Distinct NKs from the source Records
List<string> nKs = (from c in records select c.Nk).Distinct().ToList();
foreach (var nk in nKs)
{
//Get all the record for particular NK
List<Record> filteredRecords = (from c in sourceRecords where c.Nk == nk select c)
.OrderBy(c => c.Source)
.ThenBy(c => c.FromDate)
.ThenBy(c => c.ToDate).ToList();
foreach (var filteredRecord in filteredRecords)
{
_start = filteredRecord.FromDate;
_end = filteredRecord.ToDate;
while (filteredRecord.WriteComplete == false)
{
foreach (var record in filteredRecords)
{
if (record.FromDate > _start && record.FromDate < _end) _end = record.ToDate;
if (record.ToDate < _end && record.ToDate > _start) _end = record.ToDate;
}
//Output0Buffer.AddRow();
//Output0Buffer.outSource = filteredRecord.Source;
//Output0Buffer.outNK = filteredRecord.Nk;
//Output0Buffer.outRecid = filteredRecord.Guid;
//Output0Buffer.outFromDate = _start;
//Output0Buffer.outToDate = _end;
buffer.SetString(5,filteredRecord.Source);
buffer.SetString(6,filteredRecord.Nk);
buffer.SetGuid(7,filteredRecord.Guid);
buffer.SetDateTime(8,filteredRecord.FromDate);
buffer.SetDateTime(9,filteredRecord.ToDate);
_start = _end;
_end = filteredRecord.ToDate;
if (_start == _end) filteredRecord.WriteComplete = true;
}
}
}
}
}
public class Record
{
public Guid Guid { get; set; }
public string Nk { get; set; }
public string Source { get; set; }
public DateTime FromDate { get; set; }
public DateTime ToDate { get; set; }
public bool WriteComplete { get; set; }
}
在我的 ProcessArray 方法中,我尝试填充输出缓冲区。我什至不确定这是否可以做到。
任何指导将不胜感激。
谢谢
Am a newbie to to SSIS Custom Component. Just started to code a component where the input row count will never be same as output row count. foreach input row it does some validation and generates n rows which need to be mapped to output buffer.
So after with Design time validation coding everything is fine.
my RunTime code is as below:
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
inputBufferColumnIndex = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
IDTSInputColumn100 column = input.InputColumnCollection[x];
inputBufferColumnIndex[x] = BufferManager.FindColumnByLineageID (input.Buffer, column.LineageID);
}
IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
outputBufferColumnIndex = new int[output.OutputColumnCollection.Count];
for (int x = 0; x < output.OutputColumnCollection.Count; x++)
{
IDTSOutputColumn100 outcol = output.OutputColumnCollection[x];
outputBufferColumnIndex[x] = BufferManager.FindColumnByLineageID(input.Buffer, outcol.LineageID);
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
if(!buffer.EndOfRowset)
{
while (buffer.NextRow())
{
var rec = new Record
{
Source = buffer[0].ToString(),
Nk = buffer[1].ToString(),
Guid = new Guid(buffer[2].ToString()),
FromDate = Convert.ToDateTime(buffer[3].ToString()),
ToDate = Convert.ToDateTime(buffer[4].ToString())
};
sourceRecords.Add(rec);
}
ProcessArray(sourceRecords,buffer);
}
}
public void ProcessArray(List<Record> records, PipelineBuffer buffer)
{
//Get Distinct NKs from the source Records
List<string> nKs = (from c in records select c.Nk).Distinct().ToList();
foreach (var nk in nKs)
{
//Get all the record for particular NK
List<Record> filteredRecords = (from c in sourceRecords where c.Nk == nk select c)
.OrderBy(c => c.Source)
.ThenBy(c => c.FromDate)
.ThenBy(c => c.ToDate).ToList();
foreach (var filteredRecord in filteredRecords)
{
_start = filteredRecord.FromDate;
_end = filteredRecord.ToDate;
while (filteredRecord.WriteComplete == false)
{
foreach (var record in filteredRecords)
{
if (record.FromDate > _start && record.FromDate < _end) _end = record.ToDate;
if (record.ToDate < _end && record.ToDate > _start) _end = record.ToDate;
}
//Output0Buffer.AddRow();
//Output0Buffer.outSource = filteredRecord.Source;
//Output0Buffer.outNK = filteredRecord.Nk;
//Output0Buffer.outRecid = filteredRecord.Guid;
//Output0Buffer.outFromDate = _start;
//Output0Buffer.outToDate = _end;
buffer.SetString(5,filteredRecord.Source);
buffer.SetString(6,filteredRecord.Nk);
buffer.SetGuid(7,filteredRecord.Guid);
buffer.SetDateTime(8,filteredRecord.FromDate);
buffer.SetDateTime(9,filteredRecord.ToDate);
_start = _end;
_end = filteredRecord.ToDate;
if (_start == _end) filteredRecord.WriteComplete = true;
}
}
}
}
}
public class Record
{
public Guid Guid { get; set; }
public string Nk { get; set; }
public string Source { get; set; }
public DateTime FromDate { get; set; }
public DateTime ToDate { get; set; }
public bool WriteComplete { get; set; }
}
In my ProcessArray method am trying to populate the output buffer. Am not even sure this can be done.
Any guidance will be appreciated.
Thanks
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
是的,可以完成这种类型的转换,它称为异步转换。你的代码对我来说看起来不错。从您的问题中并不清楚您是否遇到了特定问题。
您可能想要尝试创建异步脚本组件转换,这样您就不必摸索所有 SSIS 管道。
更多信息在这里:
http://msdn.microsoft.com/en-us/library/ms136133。 aspx
http://msdn.microsoft.com/en-us /library/ms135931.aspx
Yes this type of transform can be done, it is called an asynchronous transform. Your code looks good to me. it wasn't clear from your question if you were experiencing a specific problem.
You may want to try and create a Asynchronous Script Component transform so you don't have to fumble with all of the SSIS plumbing.
more info here:
http://msdn.microsoft.com/en-us/library/ms136133.aspx
http://msdn.microsoft.com/en-us/library/ms135931.aspx
我不确定我是否理解您想要实现的目标,但看起来您正在尝试对所有数据进行排序,然后按顺序处理排序后的列表。请注意,您的 ProcessInput 方法被调用多次,每次都有一个新的缓冲区。您对接收缓冲区所做的任何排序仅适用于该特定缓冲区 - 数据不会全局排序,因此您的结果可能会根据缓冲区边界而有所不同。
这对于特定场景来说可以吗?如果没有,请使用排序变换对所有数据进行排序,在排序后添加变换,然后逐行处理数据 - 它已经排序了。因此,只需逐行读取它,然后在读取后修改当前行 - 这就是 buffer.SetString 的用途。
另外,不要对列索引进行硬编码,例如 buffer.SetString(5, ...) - 数字可能会改变,最好在 PreExecute 中获取并保存列索引,然后使用类似的东西
buffer.SetString(nkColumnIndex, nkColumnValue);
I'm not sure I understand what you are trying to achieve, but looks like you are trying to sort all the data and then sequentially process the sorted list. Note that your ProcessInput method is called multiple times, each with a new buffer. Any sorting you do with received buffer only applies to this particular buffer - the data is not sorted globally, so your results may vary depending on buffer boundaries.
Is this OK for particular scenario? If not, use Sort transform to sort all the data for you, add your transform after Sort and just process the data row by row - it is already sorted. So just read it row-by-row, then modify the current row after reading - this is what buffer.SetString is for.
Also, don't hardcode column indixes, like buffer.SetString(5, ...) - the numbers may change, it is better to get and save the column index in PreExecute, then use something like
buffer.SetString(nkColumnIndex, nkColumnValue);