La documentación de ParallelRunStep sugiere lo siguiente:
Un conjunto de datos de entrada con nombre (clase DatasetConsumptionConfig)
path_on_datastore = iris_data.path('iris/')
input_iris_ds = Dataset.Tabular.from_delimited_files(path=path_on_datastore, validate=False)
named_iris_ds = input_iris_ds.as_named_input(iris_ds_name)
Que se acaba de pasar como entrada:
distributed_csv_iris_step = ParallelRunStep(
name='example-iris',
inputs=[named_iris_ds],
output=output_folder,
parallel_run_config=parallel_run_config,
arguments=['--model_name', 'iris-prs'],
allow_reuse=False
)
La documentación para enviar entradas de conjuntos de datos como parámetros sugiere lo siguiente:
La entrada es un elemento de clase DatasetConsumptionConfig
tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
Que se pasa tanto en argumentos como en entradas
train_step = PythonScriptStep(
name="train_step",
script_name="train_with_dataset.py",
arguments=["--param2", tabular_ds_consumption],
inputs=[tabular_ds_consumption],
compute_target=compute_target,
source_directory=source_directory)
Al enviar con un nuevo parámetro, creamos una nueva clase de conjunto de datos:
iris_tabular_ds = Dataset.Tabular.from_delimited_files('some_link')
Y envíalo así:
pipeline_run_with_params = experiment.submit(pipeline, pipeline_parameters={'tabular_ds_param': iris_tabular_ds})
Sin embargo, ¿cómo combinamos esto? ¿Cómo pasamos una entrada del conjunto de datos como parámetro al ParallelRunStep?
Si creamos un elemento de clase DatasetConsumptionConfig así:
tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv')
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
Y páselo como argumento en ParallelRunStep, arrojará un error.
Referencias:
Cuaderno con parámetro de entrada del conjunto de datos
Cuaderno ParallelRunStep
------------------------------------
AML ParallelRunStep GA es una solución administrada para ampliar y ampliar grandes cargas de trabajo de aprendizaje automático, incluida la inferencia por lotes, la capacitación y el procesamiento de grandes datos. Consulte los documentos a continuación para obtener más detalles.
• Documento de descripción general: ejecutar inferencia por lotes usando ParallelRunStep
• Cuadernos de muestra
• AI Show: Cómo hacer inferencia por lotes usando AML ParallelRunStep
• Blog: Inferencia por lotes en Azure Machine Learning
------------------------------------
Para las entradas creamos instancias de clase Dataset:
tabular_ds1 = Dataset.Tabular.from_delimited_files('some_link')
tabular_ds2 = Dataset.Tabular.from_delimited_files('some_link')
ParallelRunStep produce un archivo de salida, usamos la clase PipelineData para crear una carpeta que almacenará esta salida:
from azureml.pipeline.core import Pipeline, PipelineData
output_dir = PipelineData(name="inferences", datastore=def_data_store)
ParallelRunStep depende de la clase ParallelRunConfig para incluir detalles sobre el entorno, el script de entrada, el nombre del archivo de salida y otras definiciones necesarias:
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
parallel_run_config = ParallelRunConfig(
source_directory=scripts_folder,
entry_script=script_file,
mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
error_threshold=10,
output_action="append_row",
append_row_file_name="mnist_outputs.txt",
environment=batch_env,
compute_target=compute_target,
process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
node_count=2
)
La entrada a ParallelRunStep se crea usando el siguiente código
tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_ds1)
tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
El PipelineParameter nos ayuda a ejecutar el pipeline para diferentes conjuntos de datos.
ParallelRunStep consume esto como entrada:
parallelrun_step = ParallelRunStep(
name="some-name",
parallel_run_config=parallel_run_config,
inputs=[ tabular_ds_consumption ],
output=output_dir,
allow_reuse=False
)
Para consumir con otro conjunto de datos:
pipeline_run_2 = experiment.submit(pipeline,
pipeline_parameters={"tabular_ds_param": tabular_ds2}
)
Actualmente hay un error: DatasetConsumptionConfig y PipelineParameter no se pueden reutilizar