#sql-server #ssis
#sql-сервер #ssis
Вопрос:
Я все еще изучаю, как создавать пользовательские компоненты в SSIS. Допустим, у меня количество входных строк 500 строк, мне нужно одновременно считывать / пакетировать / обрабатывать 100 строк из буфера входного конвейера, а затем отправлять их в стороннее приложение, как только я получу результаты, мне нужно обновить столбцы буфера конвейера новыми данными, а затем прочитать / пакетировать / обработать следующие 100 строк и так далее, пока я не обработаю все 500 строк.
Мой вопрос в том, могу ли я выполнить цикл / прочитать буфер конвейера ввода более одного раза, чтобы я мог обновить буфер возвращенными данными из стороннего приложения?
Мне показалось, я читал, что вы можете прочитать все данные и сохранить их в кэше, а затем, например, отсортировать данные, но я не уверен, как вернуть эти данные на вывод из кэша. Я также не уверен, где это должно быть сделано и как получить доступ к буферу конвейера ввода, PrimeOutput или ProcessInput или другому методу переопределения, о котором я не знаю?
Я пытаюсь создать пользовательский компонент асинхронного потока данных для решения этой проблемы.
Любая помощь или идеи будут с благодарностью приняты и / или укажут мне правильное направление!
Спасибо!
Комментарии:
1. Возврат назад и обновление строк противоречит идее конвейера SSIS. Почему бы не разбить эти этапы на конвейер, чтобы SSIS могла применять свою собственную логику буферизации? Ваш компонент может обрабатывать 100 строк за раз, если захочет, но затем просто выводит строку плюс новые / обновленные данные.
2. @JeroenMostert спасибо за ответ! Я не уверен, что полностью понимаю. Итак, я могу создать новый буфер конвейера? Или просто прокручивает входной буфер конвейера в ProcessInput() по 100 строк за раз, а затем отправляет этот пакет третьей стороне, а затем помещает возвращаемые результаты в отдельный выходной буфер, чтобы мне не приходилось проходить через входной буфер? Извините, я все еще новичок и пытаюсь понять все это.
3. Верно, неважно. Я был смущен моделью программирования SSIS (прошло некоторое время с тех пор, как я ее видел). Конечно, вы не можете просто передавать строки на вывод по мере необходимости, это было бы просто и интуитивно. 😛
4. Компонент синхронизации в SSIS — это тот, который имеет сопоставление 1: 1 между исходными строками и выводом. Асинхронность — это то, что вы определяете. Компонент сортировки по умолчанию имеет соотношение 1: 1, но выходные данные находятся в буфере, отличном от входного буфера. Похоже, что ваш процесс выполняет пакетную обработку до 100 строк, выполняет работу , добавляет эти 100 строк обогащение работы в выходной буфер. Чего я не вижу, так это того, где вы будете «зацикливаться» на исходных строках. Можете ли вы немного расширить эту идею?
5. @billinkc Да, у меня есть источник Ole DB, который возвращает 500 строк. Мне нужно пакетировать 100 строк за раз и отправлять их в пакетном режиме в API проверки адресов Smarty Streets, а затем я получу возвращаемые результаты обратно. Отсюда я в тупике, идея цикла заключалась в том, чтобы вернуться через буфер конвейера ввода и обновить возвращаемые данные. Похоже, мне не нужно повторно использовать входной буфер конвейера, так где или что мне нужно сделать, чтобы получить эти данные в выходной буфер? Где или как я могу получить доступ к этому буферу? Что касается того, где я буду выполнять цикл, это будет в методе ProcessInput().
Ответ №1:
Я рад, что не пытался сделать это от руки, поскольку было множество тонких моментов, о которых я забыл.
Здесь стоит отметить несколько моментов: мои две структуры данных InData
и OutData
, и вам нужно будет настроить их, чтобы отслеживать все, что будет находиться в буферах ввода / вывода. Как указано в комментариях, может быть разумный способ клонировать свойства объектов буфера, но я не видел, как это сделать. Определите их в соответствии с типами данных в вашем потоке данных, и если вы ленивы, как я, используйте те же имена столбцов, и вы сможете скопировать / вставить свой путь к успеху.
ApiCall
это фиктивный метод, который использует наши кэшированные значения, чтобы попросить службу очистки данных выполнить свою задачу. Он должен возвращать очищенные данные, чтобы мы могли объединить входные и выходные данные в единую строку. Вероятно, есть лучший способ сделать это, но, надеюсь, он достаточно хорош для запуска ваших мыслительных процессов.
Я создал переменную уровня SSIS, @[User::ApiBatchSize]
которую вы бы инициализировали значением 500. Использование этого подхода позволит вам оптимизировать размер отправляемого пакета без изменения основного кода. Я инициализирую нашу локальную переменную-член в PreExecute
методе, потому что это конструктор для компонента сценария.
Обычно в компоненте асинхронного скрипта вы работаете с ProcessInputRow
методом, и это то, что я изначально делал, но столкнулся с проблемой с окончательным пакетом, если размер списка даже кратен apiBatchSize . Оказывается, EndOfRowset()
в методе никогда не устанавливалось значение True. Не беспокойтесь, нам просто нужно работать с ProcessInput
методом. В «обычном» мире метод ввода процесса приводит к тому, что строка ввода процесса обрабатывает строку, поэтому мы собираемся пропустить посредника и просто работать напрямую с буфером в ProcessInput. Я был ленив и не переименовал свои Row
ссылки на Buffer
, поскольку автоматически сгенерированный код изначально адресовал параметр.
Псевдологика здесь
- При наличии строки данных
-
- если мы достигли нужного размера пакета, отправьте наш сбор данных на обработку
-
-
- Для каждой обработанной строки добавьте строку в выходной буфер и заполните его чистыми данными
-
-
- Очистите нашу корзину сбора (она уже отправлена по потоку)
- Добавьте текущую строку в нашу корзину сбора
Сам C #
using System;
using System.Data;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct InData
{
public string AddressLine1 { get; set; }
}
/// <summary>
/// There might be a clever way to re-use the metadata from the Input/OutputBuffer
/// definition but I don't know how to access it so I redefine it here
/// </summary>
public struct OutData
{
public string AddressLine1Clean { get; set; }
public string AddressCityClean { get; set; }
public string AddressStateClean { get; set; }
public string AddressPostalCodeClean { get; set; }
}
/// <summary>
/// This is the class to which to add your code. Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
List<InData> mData;
int mBatchSize;
/// <summary>
/// This method is called once, before rows begin to be processed in the data flow.
///
/// You can remove this method if you don't need to do anything here.
/// </summary>
public override void PreExecute()
{
base.PreExecute();
this.mData = new List<InData>();
this.mBatchSize = this.Variables.ApiBatchSize;
}
/// <summary>
/// This method is called after all the rows have passed through this component.
///
/// You can delete this method if you don't need to do anything here.
/// </summary>
public override void PostExecute()
{
base.PostExecute();
}
/// <summary>
/// We're going to work with ProcessInput versus PorcessInputRow as it is
/// "closer to the bare metal" and we need that
/// </summary>
/// <param name="Buffer"></param>
public override void Input0_ProcessInput(Input0Buffer Row)
{
//base.Input0_ProcessInput(Buffer);
while (Row.NextRow())
{
if (this.mData.Count >= this.mBatchSize)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Batch released. Conditions => mDataCount := " this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
this.mData.Add(new InData() { AddressLine1 = Row.AddressLine1 });
}
// Handle the final possible partial batch
if (this.mData.Count > 0)
{
foreach (var item in ApiCall())
{
Output0Buffer.AddRow();
var inRow = item.Key;
var outRow = item.Value;
// fill columns with original data
Output0Buffer.AddressLine1 = inRow.AddressLine1;
// etc
// fill columns with clean data
Output0Buffer.AddressLine1Clean = outRow.AddressLine1Clean;
Output0Buffer.AddressCityClean = outRow.AddressCityClean;
Output0Buffer.AddressStateClean = outRow.AddressStateClean;
Output0Buffer.AddressPostalCodeClean = outRow.AddressPostalCodeClean;
// etc
}
// TODO Remove this for production, just ensuring batching is working as intended
bool fireAgain = false;
string status = "Final batch released. Conditions => mDataCount := " this.mData.Count;
this.ComponentMetaData.FireInformation(0, "ApiProcessing", status, "", 0, ref fireAgain);
// Reset for next iteration
this.mData.Clear();
}
}
///// <summary>
///// This method is called once for every row that passes through the component from Input0.
///// We need to preserve rows in our own memory allocation
///// We're not getting the EndOfRowset call in time to release the final
///// </summary>
///// <param name="Row">The row that is currently passing through the component</param>
//public override void Input0_ProcessInputRow(Input0Buffer Row)
//{
//}
public override void CreateNewOutputRows()
{
// I don't think we need to do anything special here
// but I'm leaving it in in case you have some weird case
}
/// <summary>
/// Simulate data cleaning
/// </summary>
/// <returns></returns>
public Dictionary<InData, OutData> ApiCall()
{
int macGuffin = 0;
Dictionary<InData, OutData> cleanData = new Dictionary<InData, OutData>();
foreach (var item in this.mData)
{
cleanData.Add(item, new OutData() { AddressLine1Clean = "Clean" item.AddressLine1, AddressCityClean = "Clean", AddressPostalCodeClean = "12345-1234", AddressStateClean = "CL" });
macGuffin = macGuffin % this.mBatchSize;
}
return cleanData;
}
}
Скриншоты компонента сценария
Здесь мы делаем переменные уровня SSIS доступными для компонента сценария. Я выбрал ApiBatchSize
На вкладке Входные столбцы я выбрал все столбцы, которые необходимо пропустить, и пометил их как тип использования только для чтения.
На вкладке «Входы и выходы» первое, что я делаю, это переход к Output 0
и изменение SynchronousInputID
чего-то вроде «Компонента сценария».Входные данные [Вход 0]» нет
Определите все столбцы, которые вам понадобятся. Я дублирую свои исходные столбцы (AddressLine1), а затем добавляю все новые столбцы, которые сможет заполнить моя обработка (AddressLine1Clean, город / штат / почтовый индекс). В разделе Вывод 0 выберите коллекцию выходных столбцов и повторно нажмите «Добавить столбец» и настройте. Помимо указания имени, я изменил все типы данных на string (DT_STR) здесь, поскольку это то, с чем я работаю. По умолчанию используется 32-разрядный целочисленный тип (DT_I4)
Обратите внимание, что на этом скриншоте нет исходных столбцов, но вам нужно будет добавить их для работы кода.
Там могут быть более новые книги, но эта вышедшая из печати книга менеджера программ на момент появления SSIS — это то, на что я все еще ссылаюсь, когда сталкиваюсь с вопросами о сценариях.
The Rational Guide to Scripting SQL Server 2005 Integration Services Beta Preview (Rational Guides) Дональда Фармера, Дерека Фармера в мягкой обложке, 192 страницы, опубликовано 2005 ISBN-10: 1-932577-21-1 / 1932577211 ISBN-13: 978-1-932577-21-1 / 9781932577211