Parallel Processing en ABAP permite la ejecución simultánea de tareas a través de múltiples work processes. Con llamadas RFC asíncronas (aRFC) y STARTING NEW TASK se pueden acelerar significativamente las operaciones intensivas en datos.
Concepto Básico
| Método | Descripción |
|---|---|
STARTING NEW TASK | Inicia RFC asíncrono |
CALLING ... ON END OF TASK | Callback al finalizar |
WAIT UNTIL | Espera resultados |
RECEIVE RESULTS | Recibe valores de retorno |
Sintaxis
" Llamada asíncronaCALL FUNCTION 'FUNC_NAME' STARTING NEW TASK task_name DESTINATION destination CALLING callback_method ON END OF TASK EXPORTING param = value TABLES itab = lt_data.
" Esperar resultadosWAIT UNTIL lv_counter >= lv_expected.
" Obtener resultados (en el callback)RECEIVE RESULTS FROM FUNCTION 'FUNC_NAME' IMPORTING result = lv_result TABLES itab = lt_result.Ejemplos
1. Llamada Asíncrona Simple
REPORT zparallel_simple.
DATA: gv_result TYPE string, gv_done TYPE abap_bool.
START-OF-SELECTION. " Iniciar tarea asíncrona CALL FUNCTION 'Z_LONG_RUNNING_TASK' STARTING NEW TASK 'TASK1' DESTINATION 'NONE' CALLING on_task_complete ON END OF TASK EXPORTING iv_input = 'Test'.
" Esperar resultado WAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
IF gv_done = abap_true. WRITE: / 'Resultado:', gv_result. ELSE. WRITE: / 'Timeout!'. ENDIF.
FORM on_task_complete USING p_task TYPE clike. RECEIVE RESULTS FROM FUNCTION 'Z_LONG_RUNNING_TASK' IMPORTING ev_result = gv_result. gv_done = abap_true.ENDFORM.2. Procesamiento Paralelo con Clase
CLASS zcl_parallel_processor DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_task, id TYPE i, status TYPE string, result TYPE string, END OF ty_task, ty_tasks TYPE STANDARD TABLE OF ty_task WITH KEY id.
METHODS: run_parallel IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE ty_tasks.
METHODS: on_task_end FOR EVENT task_end OF cl_abap_parallel.
PRIVATE SECTION. DATA: mt_tasks TYPE ty_tasks, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_parallel_processor IMPLEMENTATION. METHOD run_parallel. mv_total = lines( it_items ). mv_completed = 0. CLEAR mt_tasks.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_id = lv_task_id + 1.
APPEND VALUE #( id = lv_task_id status = 'RUNNING' ) TO mt_tasks.
" Llamada asíncrona CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |TASK{ lv_task_id }| DESTINATION 'NONE' CALLING on_task_complete ON END OF TASK EXPORTING iv_item = lv_item iv_task_id = lv_task_id. ENDLOOP.
" Esperar hasta que todos terminen WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
rt_results = mt_tasks. ENDMETHOD.
METHOD on_task_complete. DATA: lv_result TYPE string, lv_task_id TYPE i.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_ITEM' IMPORTING ev_result = lv_result ev_task_id = lv_task_id.
" Guardar resultado MODIFY mt_tasks FROM VALUE #( id = lv_task_id status = 'COMPLETED' result = lv_result ) TRANSPORTING status result WHERE id = lv_task_id.
mv_completed = mv_completed + 1. ENDMETHOD.ENDCLASS.3. Procesamiento por Lotes con Chunks
CLASS zcl_batch_parallel DEFINITION. PUBLIC SECTION. CONSTANTS: c_max_tasks TYPE i VALUE 10.
METHODS: process_in_batches IMPORTING it_data TYPE ty_data_tab RETURNING VALUE(rt_results) TYPE ty_result_tab.
PRIVATE SECTION. DATA: mv_active_tasks TYPE i, mv_completed TYPE i, mt_results TYPE ty_result_tab.
METHODS: on_batch_complete IMPORTING p_task TYPE clike.ENDCLASS.
CLASS zcl_batch_parallel IMPLEMENTATION. METHOD process_in_batches. DATA: lt_chunk TYPE ty_data_tab, lv_chunk_size TYPE i, lv_task_no TYPE i.
" Calcular tamaño del chunk lv_chunk_size = COND #( WHEN lines( it_data ) <= c_max_tasks THEN 1 ELSE lines( it_data ) / c_max_tasks + 1 ).
mv_active_tasks = 0. mv_completed = 0. CLEAR mt_results.
" Dividir datos en chunks y procesar en paralelo DATA(lv_start) = 1. WHILE lv_start <= lines( it_data ). lv_task_no = lv_task_no + 1.
" Crear chunk CLEAR lt_chunk. LOOP AT it_data INTO DATA(ls_data) FROM lv_start. APPEND ls_data TO lt_chunk. IF lines( lt_chunk ) >= lv_chunk_size. EXIT. ENDIF. ENDLOOP.
" Iniciar tarea CALL FUNCTION 'Z_PROCESS_BATCH' STARTING NEW TASK |BATCH{ lv_task_no }| DESTINATION 'NONE' CALLING on_batch_complete ON END OF TASK EXPORTING iv_batch_id = lv_task_no TABLES it_data = lt_chunk.
mv_active_tasks = mv_active_tasks + 1. lv_start = lv_start + lv_chunk_size.
" Limitar tareas simultáneas máximas IF mv_active_tasks >= c_max_tasks. WAIT UNTIL mv_completed > 0 UP TO 60 SECONDS. ENDIF. ENDWHILE.
" Esperar todas las tareas restantes WAIT UNTIL mv_active_tasks = mv_completed UP TO 300 SECONDS.
rt_results = mt_results. ENDMETHOD.
METHOD on_batch_complete. DATA: lt_batch_results TYPE ty_result_tab.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_BATCH' TABLES et_results = lt_batch_results.
APPEND LINES OF lt_batch_results TO mt_results. mv_completed = mv_completed + 1. ENDMETHOD.ENDCLASS.4. Usar Grupo de Servidores
DATA: lv_group TYPE rzlli_apts.
" Determinar grupo de servidoresCALL FUNCTION 'SPBT_INITIALIZE' EXPORTING group_name = 'parallel_generators' IMPORTING free_pbt_wps = DATA(lv_free_wps) EXCEPTIONS invalid_group_name = 1 internal_error = 2 pbt_env_already_initialized = 3 currently_no_resources_avail = 4 no_pbt_resources_found = 5 cant_init_different_pbt_groups = 6 OTHERS = 7.
IF sy-subrc = 0. lv_group = 'parallel_generators'.ELSE. lv_group = 'parallel_generators'. " Grupo predeterminadoENDIF.
" Llamar con grupo de servidoresCALL FUNCTION 'Z_HEAVY_CALCULATION' STARTING NEW TASK 'CALC1' DESTINATION IN GROUP lv_group CALLING on_calculation_done ON END OF TASK EXPORTING iv_param = lv_param.5. Gestión de Recursos
CLASS zcl_resource_manager DEFINITION. PUBLIC SECTION. METHODS: get_available_processes RETURNING VALUE(rv_count) TYPE i.
METHODS: process_with_throttling IMPORTING it_items TYPE string_table.
PRIVATE SECTION. DATA: mv_max_parallel TYPE i VALUE 5, mv_running TYPE i, mv_completed TYPE i.ENDCLASS.
CLASS zcl_resource_manager IMPLEMENTATION. METHOD get_available_processes. " Determinar work processes libres CALL FUNCTION 'SPBT_INITIALIZE' EXPORTING group_name = 'parallel_generators' IMPORTING free_pbt_wps = rv_count EXCEPTIONS OTHERS = 1.
IF sy-subrc <> 0. rv_count = 3. " Fallback ENDIF.
" Limitar máximo IF rv_count > mv_max_parallel. rv_count = mv_max_parallel. ENDIF. ENDMETHOD.
METHOD process_with_throttling. DATA(lv_max) = get_available_processes( ). mv_running = 0. mv_completed = 0.
DATA(lv_task_no) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_no = lv_task_no + 1.
" Esperar si se alcanza el máximo WHILE mv_running - mv_completed >= lv_max. WAIT UNTIL mv_completed > ( mv_running - lv_max ) UP TO 5 SECONDS. ENDWHILE.
" Iniciar tarea CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROC{ lv_task_no }| DESTINATION 'NONE' CALLING on_item_done ON END OF TASK EXPORTING iv_item = lv_item.
mv_running = mv_running + 1. ENDLOOP.
" Esperar a todos WAIT UNTIL mv_completed >= mv_running UP TO 600 SECONDS. ENDMETHOD.ENDCLASS.6. Manejo de Errores
CLASS zcl_parallel_with_errors DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_result, task_id TYPE i, success TYPE abap_bool, message TYPE string, data TYPE string, END OF ty_result, ty_results TYPE STANDARD TABLE OF ty_result WITH KEY task_id.
METHODS: process_with_error_handling IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE ty_results.
PRIVATE SECTION. DATA: mt_results TYPE ty_results, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_parallel_with_errors IMPLEMENTATION. METHOD process_with_error_handling. mv_total = lines( it_items ). mv_completed = 0. CLEAR mt_results.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_id = lv_task_id + 1.
TRY. CALL FUNCTION 'Z_RISKY_OPERATION' STARTING NEW TASK |RISK{ lv_task_id }| DESTINATION 'NONE' CALLING on_risky_complete ON END OF TASK EXPORTING iv_item = lv_item iv_task_id = lv_task_id.
CATCH cx_root INTO DATA(lx_error). " Documentar error de inicio APPEND VALUE #( task_id = lv_task_id success = abap_false message = lx_error->get_text( ) ) TO mt_results. mv_completed = mv_completed + 1. ENDTRY. ENDLOOP.
" Timeout con manejo de errores DATA(lv_start) = sy-uzeit. WHILE mv_completed < mv_total. WAIT UNTIL mv_completed >= mv_total UP TO 5 SECONDS.
" Verificación de timeout (5 minutos) IF sy-uzeit - lv_start > 300. " Marcar restantes como timeout LOOP AT mt_results INTO DATA(ls_result) WHERE success IS INITIAL. ls_result-message = 'Timeout'. MODIFY mt_results FROM ls_result. ENDLOOP. EXIT. ENDIF. ENDWHILE.
rt_results = mt_results. ENDMETHOD.ENDCLASS.
" Callback con manejo de erroresFORM on_risky_complete USING p_task TYPE clike. DATA: lv_result TYPE string, lv_task_id TYPE i, lv_subrc TYPE sy-subrc, lv_message TYPE string.
RECEIVE RESULTS FROM FUNCTION 'Z_RISKY_OPERATION' IMPORTING ev_result = lv_result ev_task_id = lv_task_id EXCEPTIONS communication_failure = 1 MESSAGE lv_message system_failure = 2 MESSAGE lv_message OTHERS = 3.
IF sy-subrc = 0. APPEND VALUE #( task_id = lv_task_id success = abap_true data = lv_result ) TO mt_results. ELSE. APPEND VALUE #( task_id = lv_task_id success = abap_false message = COND #( WHEN lv_message IS NOT INITIAL THEN lv_message ELSE |Error { sy-subrc }| ) ) TO mt_results. ENDIF.
mv_completed = mv_completed + 1.ENDFORM.7. Módulo de Función RFC para Paralelización
FUNCTION z_parallel_worker.*"----------------------------------------------------------------------*"*"Interfaz local:*" IMPORTING*" VALUE(IV_TASK_ID) TYPE I*" VALUE(IT_DATA) TYPE TY_DATA_TAB*" EXPORTING*" VALUE(ET_RESULTS) TYPE TY_RESULT_TAB*" VALUE(EV_TASK_ID) TYPE I*"----------------------------------------------------------------------
ev_task_id = iv_task_id.
LOOP AT it_data INTO DATA(ls_data). " Procesamiento DATA(ls_result) = VALUE ty_result( id = ls_data-id status = 'PROCESSED' value = ls_data-value * 2 ). APPEND ls_result TO et_results. ENDLOOP.
ENDFUNCTION.8. CL_ABAP_PARALLEL para Paralelización Moderna
CLASS zcl_modern_parallel DEFINITION. PUBLIC SECTION. INTERFACES: if_abap_parallel.
CLASS-METHODS: run_parallel IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE string_table.
PRIVATE SECTION. DATA: mv_item TYPE string.ENDCLASS.
CLASS zcl_modern_parallel IMPLEMENTATION. METHOD if_abap_parallel~do. " Este método se ejecuta en paralelo DATA(lv_result) = |Procesado: { mv_item }|.
" Serializar resultado p_out = cl_abap_parallel=>get_parallel_output( ). EXPORT result = lv_result TO DATA BUFFER p_out. ENDMETHOD.
METHOD run_parallel. DATA: lt_instances TYPE TABLE OF REF TO zcl_modern_parallel, lt_handles TYPE abap_parallel_handles.
" Crear instancias LOOP AT it_items INTO DATA(lv_item). DATA(lo_instance) = NEW zcl_modern_parallel( ). lo_instance->mv_item = lv_item. APPEND lo_instance TO lt_instances. ENDLOOP.
" Ejecutar en paralelo TRY. cl_abap_parallel=>run_parallel( EXPORTING i_instances = lt_instances IMPORTING e_handles = lt_handles ).
" Recopilar resultados LOOP AT lt_handles INTO DATA(ls_handle). IF ls_handle-done = abap_true. DATA: lv_result TYPE string. IMPORT result = lv_result FROM DATA BUFFER ls_handle-out. IF sy-subrc = 0. APPEND lv_result TO rt_results. ENDIF. ENDIF. ENDLOOP.
CATCH cx_abap_parallel INTO DATA(lx_error). " Manejo de errores ENDTRY. ENDMETHOD.ENDCLASS.9. Indicador de Progreso
CLASS zcl_parallel_progress DEFINITION. PUBLIC SECTION. METHODS: process_with_progress IMPORTING it_items TYPE string_table.
PRIVATE SECTION. DATA: mv_total TYPE i, mv_completed TYPE i, mv_errors TYPE i.
METHODS: update_progress.ENDCLASS.
CLASS zcl_parallel_progress IMPLEMENTATION. METHOD process_with_progress. mv_total = lines( it_items ). mv_completed = 0. mv_errors = 0.
DATA(lv_task_no) = 0.
" Inicializar indicador de progreso CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR' EXPORTING percentage = 0 text = |Iniciando { mv_total } tareas paralelas...|.
" Iniciar tareas LOOP AT it_items INTO DATA(lv_item). lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROG{ lv_task_no }| DESTINATION 'NONE' CALLING on_progress_update ON END OF TASK EXPORTING iv_item = lv_item. ENDLOOP.
" Esperar y actualizar progreso WHILE mv_completed < mv_total. WAIT UNTIL mv_completed >= mv_total UP TO 1 SECONDS. update_progress( ). ENDWHILE.
" Finalización CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR' EXPORTING percentage = 100 text = |Terminado: { mv_completed } OK, { mv_errors } errores|. ENDMETHOD.
METHOD update_progress. DATA(lv_percent) = mv_completed * 100 / mv_total.
CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR' EXPORTING percentage = lv_percent text = |{ mv_completed }/{ mv_total } procesados ({ mv_errors } errores)|. ENDMETHOD.ENDCLASS.10. Patrón Map-Reduce
CLASS zcl_map_reduce DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_key_value, key TYPE string, value TYPE i, END OF ty_key_value, ty_key_values TYPE STANDARD TABLE OF ty_key_value WITH KEY key.
METHODS: map_reduce IMPORTING it_data TYPE string_table RETURNING VALUE(rt_result) TYPE ty_key_values.
PRIVATE SECTION. DATA: mt_map_results TYPE ty_key_values, mv_map_done TYPE i, mv_map_total TYPE i.ENDCLASS.
CLASS zcl_map_reduce IMPLEMENTATION. METHOD map_reduce. " Fase MAP - paralela mv_map_total = lines( it_data ). mv_map_done = 0. CLEAR mt_map_results.
DATA(lv_chunk_no) = 0. LOOP AT it_data INTO DATA(lv_data). lv_chunk_no = lv_chunk_no + 1.
CALL FUNCTION 'Z_MAP_FUNCTION' STARTING NEW TASK |MAP{ lv_chunk_no }| DESTINATION 'NONE' CALLING on_map_complete ON END OF TASK EXPORTING iv_data = lv_data. ENDLOOP.
" Esperar todas las tareas Map WAIT UNTIL mv_map_done >= mv_map_total UP TO 300 SECONDS.
" Fase REDUCE - local (agregar) DATA: lt_grouped TYPE SORTED TABLE OF ty_key_value WITH UNIQUE KEY key.
LOOP AT mt_map_results INTO DATA(ls_result). READ TABLE lt_grouped ASSIGNING FIELD-SYMBOL(<fs_group>) WITH KEY key = ls_result-key. IF sy-subrc = 0. <fs_group>-value = <fs_group>-value + ls_result-value. ELSE. INSERT ls_result INTO TABLE lt_grouped. ENDIF. ENDLOOP.
rt_result = lt_grouped. ENDMETHOD.ENDCLASS.11. Procesamiento Paralelo Transaccional
CLASS zcl_transactional_parallel DEFINITION. PUBLIC SECTION. METHODS: process_and_commit IMPORTING it_items TYPE ty_item_tab RETURNING VALUE(rv_success) TYPE abap_bool.
PRIVATE SECTION. DATA: mv_all_success TYPE abap_bool VALUE abap_true, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_transactional_parallel IMPLEMENTATION. METHOD process_and_commit. mv_total = lines( it_items ). mv_completed = 0. mv_all_success = abap_true.
DATA(lv_task_no) = 0.
" Procesamiento paralelo (sin commit) LOOP AT it_items INTO DATA(ls_item). lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_NO_COMMIT' STARTING NEW TASK |TRANS{ lv_task_no }| DESTINATION 'NONE' CALLING on_trans_complete ON END OF TASK EXPORTING is_item = ls_item. ENDLOOP.
" Esperar a todos WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
" Commit o Rollback IF mv_all_success = abap_true. CALL FUNCTION 'BAPI_TRANSACTION_COMMIT' EXPORTING wait = abap_true. rv_success = abap_true. ELSE. CALL FUNCTION 'BAPI_TRANSACTION_ROLLBACK'. rv_success = abap_false. ENDIF. ENDMETHOD.ENDCLASS.12. Ejemplo Práctico: Exportación de Datos Masivos
CLASS zcl_mass_export DEFINITION. PUBLIC SECTION. METHODS: export_customers_parallel IMPORTING iv_country TYPE land1 RETURNING VALUE(rv_file) TYPE string.
PRIVATE SECTION. CONSTANTS: c_chunk_size TYPE i VALUE 1000.
DATA: mt_all_data TYPE ty_customer_tab, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_mass_export IMPLEMENTATION. METHOD export_customers_parallel. " Cargar números de cliente SELECT kunnr FROM kna1 WHERE land1 = @iv_country INTO TABLE @DATA(lt_customers).
mv_total = lines( lt_customers ) / c_chunk_size + 1. mv_completed = 0. CLEAR mt_all_data.
" Cargar chunks en paralelo DATA: lv_from TYPE i VALUE 1, lv_chunk_no TYPE i VALUE 0.
WHILE lv_from <= lines( lt_customers ). lv_chunk_no = lv_chunk_no + 1.
" Calcular rango del chunk DATA(lv_to) = lv_from + c_chunk_size - 1. IF lv_to > lines( lt_customers ). lv_to = lines( lt_customers ). ENDIF.
" Números de cliente para el chunk DATA(lt_chunk_keys) = VALUE ty_kunnr_tab( FOR i = lv_from WHILE i <= lv_to ( lt_customers[ i ]-kunnr ) ).
" Cargar en paralelo CALL FUNCTION 'Z_LOAD_CUSTOMER_DATA' STARTING NEW TASK |EXP{ lv_chunk_no }| DESTINATION 'NONE' CALLING on_chunk_loaded ON END OF TASK EXPORTING it_customers = lt_chunk_keys.
lv_from = lv_to + 1. ENDWHILE.
" Esperar WAIT UNTIL mv_completed >= mv_total UP TO 600 SECONDS.
" Escribir archivo rv_file = |/tmp/customers_{ iv_country }_{ sy-datum }.csv|.
OPEN DATASET rv_file FOR OUTPUT IN TEXT MODE ENCODING UTF-8. IF sy-subrc = 0. TRANSFER 'KUNNR;NAME1;ORT01;LAND1' TO rv_file. LOOP AT mt_all_data INTO DATA(ls_data). DATA(lv_line) = |{ ls_data-kunnr };{ ls_data-name1 };{ ls_data-ort01 };{ ls_data-land1 }|. TRANSFER lv_line TO rv_file. ENDLOOP. CLOSE DATASET rv_file. ENDIF. ENDMETHOD.ENDCLASS.Opciones de Destino
| Destino | Descripción |
|---|---|
'NONE' | Servidor local, misma instancia |
IN GROUP | Usar grupo de servidores |
IN GROUP DEFAULT | Grupo predeterminado |
| RFC-Destination | Sistema externo |
Variantes de WAIT
" Esperar condiciónWAIT UNTIL gv_done = abap_true.
" Con timeoutWAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
" Esperar cualquier tareaWAIT FOR ASYNCHRONOUS TASKS UNTIL gv_count > 0.
" Múltiples condicionesWAIT UNTIL gv_task1_done = abap_true AND gv_task2_done = abap_true UP TO 120 SECONDS.Notas Importantes / Mejores Prácticas
- Cantidad de tareas paralelas ajustar a los work processes disponibles.
- SPBT_INITIALIZE usar para configuración de grupos de servidores.
- Manejo de errores en RECEIVE RESULTS con EXCEPTIONS.
- Timeout definir siempre (UP TO … SECONDS).
- Sin bloqueos de DB entre tareas - evitar deadlocks.
- Callback-Form/Method debe existir en tiempo de ejecución.
- Volumen de datos por tarea optimizar (ni muy grande, ni muy pequeño).
- Los módulos de función RFC deben ser habilitados para remote.
- COMMIT WORK solo después de completar todas las tareas.
- Combinar con HTTP-Client para llamadas API paralelas.