Le Parallel Processing en ABAP permet l’exécution simultanée de tâches sur plusieurs processus de travail. Avec les appels RFC asynchrones (aRFC) et STARTING NEW TASK, les opérations intensives en données peuvent être significativement accélérées.
Concept de base
| Méthode | Description |
|---|---|
STARTING NEW TASK | Démarre un RFC asynchrone |
CALLING ... ON END OF TASK | Callback à la fin |
WAIT UNTIL | Attend les résultats |
RECEIVE RESULTS | Reçoit les valeurs de retour |
Syntaxe
" Appel asynchroneCALL FUNCTION 'FUNC_NAME' STARTING NEW TASK task_name DESTINATION destination CALLING callback_method ON END OF TASK EXPORTING param = value TABLES itab = lt_data.
" Attendre les résultatsWAIT UNTIL lv_counter >= lv_expected.
" Récupérer les résultats (dans le callback)RECEIVE RESULTS FROM FUNCTION 'FUNC_NAME" IMPORTING result = lv_result TABLES itab = lt_result.Exemples
1. Appel asynchrone simple
REPORT zparallel_simple.
DATA: gv_result TYPE string, gv_done TYPE abap_bool.
START-OF-SELECTION. " Démarrer un task asynchrone CALL FUNCTION 'Z_LONG_RUNNING_TASK' STARTING NEW TASK 'TASK1" DESTINATION 'NONE" CALLING on_task_complete ON END OF TASK EXPORTING iv_input = 'Test'.
" Attendre le résultat WAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
IF gv_done = abap_true. WRITE: / 'Résultat :', gv_result. ELSE. WRITE: / 'Timeout !'. ENDIF.
FORM on_task_complete USING p_task TYPE clike. RECEIVE RESULTS FROM FUNCTION 'Z_LONG_RUNNING_TASK" IMPORTING ev_result = gv_result. gv_done = abap_true.ENDFORM.2. Traitement parallèle avec classe
CLASS zcl_parallel_processor DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_task, id TYPE i, status TYPE string, result TYPE string, END OF ty_task, ty_tasks TYPE STANDARD TABLE OF ty_task WITH KEY id.
METHODS: run_parallel IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE ty_tasks.
METHODS: on_task_end FOR EVENT task_end OF cl_abap_parallel.
PRIVATE SECTION. DATA: mt_tasks TYPE ty_tasks, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_parallel_processor IMPLEMENTATION. METHOD run_parallel. mv_total = lines( it_items ). mv_completed = 0. CLEAR mt_tasks.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_id = lv_task_id + 1.
APPEND VALUE #( id = lv_task_id status = 'RUNNING" ) TO mt_tasks.
" Appel asynchrone CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |TASK{ lv_task_id }| DESTINATION 'NONE" CALLING on_task_complete ON END OF TASK EXPORTING iv_item = lv_item iv_task_id = lv_task_id. ENDLOOP.
" Attendre jusqu'à ce que tous soient terminés WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
rt_results = mt_tasks. ENDMETHOD.
METHOD on_task_complete. DATA: lv_result TYPE string, lv_task_id TYPE i.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_ITEM" IMPORTING ev_result = lv_result ev_task_id = lv_task_id.
" Sauvegarder le résultat MODIFY mt_tasks FROM VALUE #( id = lv_task_id status = 'COMPLETED" result = lv_result ) TRANSPORTING status result WHERE id = lv_task_id.
mv_completed = mv_completed + 1. ENDMETHOD.ENDCLASS.3. Traitement batch avec chunks
CLASS zcl_batch_parallel DEFINITION. PUBLIC SECTION. CONSTANTS: c_max_tasks TYPE i VALUE 10.
METHODS: process_in_batches IMPORTING it_data TYPE ty_data_tab RETURNING VALUE(rt_results) TYPE ty_result_tab.
PRIVATE SECTION. DATA: mv_active_tasks TYPE i, mv_completed TYPE i, mt_results TYPE ty_result_tab.
METHODS: on_batch_complete IMPORTING p_task TYPE clike.ENDCLASS.
CLASS zcl_batch_parallel IMPLEMENTATION. METHOD process_in_batches. DATA: lt_chunk TYPE ty_data_tab, lv_chunk_size TYPE i, lv_task_no TYPE i.
" Calculer la taille du chunk lv_chunk_size = COND #( WHEN lines( it_data ) <= c_max_tasks THEN 1 ELSE lines( it_data ) / c_max_tasks + 1 ).
mv_active_tasks = 0. mv_completed = 0. CLEAR mt_results.
" Diviser les données en chunks et traiter en parallèle DATA(lv_start) = 1. WHILE lv_start <= lines( it_data ). lv_task_no = lv_task_no + 1.
" Créer un chunk CLEAR lt_chunk. LOOP AT it_data INTO DATA(ls_data) FROM lv_start. APPEND ls_data TO lt_chunk. IF lines( lt_chunk ) >= lv_chunk_size. EXIT. ENDIF. ENDLOOP.
" Démarrer le task CALL FUNCTION 'Z_PROCESS_BATCH' STARTING NEW TASK |BATCH{ lv_task_no }| DESTINATION 'NONE" CALLING on_batch_complete ON END OF TASK EXPORTING iv_batch_id = lv_task_no TABLES it_data = lt_chunk.
mv_active_tasks = mv_active_tasks + 1. lv_start = lv_start + lv_chunk_size.
" Limiter les tasks simultanés max. IF mv_active_tasks >= c_max_tasks. WAIT UNTIL mv_completed > 0 UP TO 60 SECONDS. ENDIF. ENDWHILE.
" Attendre tous les tasks restants WAIT UNTIL mv_active_tasks = mv_completed UP TO 300 SECONDS.
rt_results = mt_results. ENDMETHOD.
METHOD on_batch_complete. DATA: lt_batch_results TYPE ty_result_tab.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_BATCH" TABLES et_results = lt_batch_results.
APPEND LINES OF lt_batch_results TO mt_results. mv_completed = mv_completed + 1. ENDMETHOD.ENDCLASS.4. Utiliser un groupe de serveurs
DATA: lv_group TYPE rzlli_apts.
" Déterminer le groupe de serveursCALL FUNCTION 'SPBT_INITIALIZE" EXPORTING group_name = 'parallel_generators" IMPORTING free_pbt_wps = DATA(lv_free_wps) EXCEPTIONS invalid_group_name = 1 internal_error = 2 pbt_env_already_initialized = 3 currently_no_resources_avail = 4 no_pbt_resources_found = 5 cant_init_different_pbt_groups = 6 OTHERS = 7.
IF sy-subrc = 0. lv_group = 'parallel_generators'.ELSE. lv_group = 'parallel_generators'. " Groupe par défautENDIF.
" Appeler avec groupe de serveursCALL FUNCTION 'Z_HEAVY_CALCULATION' STARTING NEW TASK 'CALC1" DESTINATION IN GROUP lv_group CALLING on_calculation_done ON END OF TASK EXPORTING iv_param = lv_param.5. Gestion des ressources
CLASS zcl_resource_manager DEFINITION. PUBLIC SECTION. METHODS: get_available_processes RETURNING VALUE(rv_count) TYPE i.
METHODS: process_with_throttling IMPORTING it_items TYPE string_table.
PRIVATE SECTION. DATA: mv_max_parallel TYPE i VALUE 5, mv_running TYPE i, mv_completed TYPE i.ENDCLASS.
CLASS zcl_resource_manager IMPLEMENTATION. METHOD get_available_processes. " Déterminer les processus de travail libres CALL FUNCTION 'SPBT_INITIALIZE" EXPORTING group_name = 'parallel_generators" IMPORTING free_pbt_wps = rv_count EXCEPTIONS OTHERS = 1.
IF sy-subrc <> 0. rv_count = 3. " Fallback ENDIF.
" Limiter au maximum IF rv_count > mv_max_parallel. rv_count = mv_max_parallel. ENDIF. ENDMETHOD.
METHOD process_with_throttling. DATA(lv_max) = get_available_processes( ). mv_running = 0. mv_completed = 0.
DATA(lv_task_no) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_no = lv_task_no + 1.
" Attendre si le maximum est atteint WHILE mv_running - mv_completed >= lv_max. WAIT UNTIL mv_completed > ( mv_running - lv_max ) UP TO 5 SECONDS. ENDWHILE.
" Démarrer le task CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROC{ lv_task_no }| DESTINATION 'NONE" CALLING on_item_done ON END OF TASK EXPORTING iv_item = lv_item.
mv_running = mv_running + 1. ENDLOOP.
" Attendre tous WAIT UNTIL mv_completed >= mv_running UP TO 600 SECONDS. ENDMETHOD.ENDCLASS.6. Gestion des erreurs
CLASS zcl_parallel_with_errors DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_result, task_id TYPE i, success TYPE abap_bool, message TYPE string, data TYPE string, END OF ty_result, ty_results TYPE STANDARD TABLE OF ty_result WITH KEY task_id.
METHODS: process_with_error_handling IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE ty_results.
PRIVATE SECTION. DATA: mt_results TYPE ty_results, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_parallel_with_errors IMPLEMENTATION. METHOD process_with_error_handling. mv_total = lines( it_items ). mv_completed = 0. CLEAR mt_results.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_id = lv_task_id + 1.
TRY. CALL FUNCTION 'Z_RISKY_OPERATION' STARTING NEW TASK |RISK{ lv_task_id }| DESTINATION 'NONE" CALLING on_risky_complete ON END OF TASK EXPORTING iv_item = lv_item iv_task_id = lv_task_id.
CATCH cx_root INTO DATA(lx_error). " Documenter l'erreur de démarrage APPEND VALUE #( task_id = lv_task_id success = abap_false message = lx_error->get_text( ) ) TO mt_results. mv_completed = mv_completed + 1. ENDTRY. ENDLOOP.
" Timeout avec gestion d'erreur DATA(lv_start) = sy-uzeit. WHILE mv_completed < mv_total. WAIT UNTIL mv_completed >= mv_total UP TO 5 SECONDS.
" Vérification timeout (5 minutes) IF sy-uzeit - lv_start > 300. " Marquer les restants comme timeout LOOP AT mt_results INTO DATA(ls_result) WHERE success IS INITIAL. ls_result-message = 'Timeout'. MODIFY mt_results FROM ls_result. ENDLOOP. EXIT. ENDIF. ENDWHILE.
rt_results = mt_results. ENDMETHOD.ENDCLASS.
" Callback avec gestion d'erreurFORM on_risky_complete USING p_task TYPE clike. DATA: lv_result TYPE string, lv_task_id TYPE i, lv_subrc TYPE sy-subrc, lv_message TYPE string.
RECEIVE RESULTS FROM FUNCTION 'Z_RISKY_OPERATION" IMPORTING ev_result = lv_result ev_task_id = lv_task_id EXCEPTIONS communication_failure = 1 MESSAGE lv_message system_failure = 2 MESSAGE lv_message OTHERS = 3.
IF sy-subrc = 0. APPEND VALUE #( task_id = lv_task_id success = abap_true data = lv_result ) TO mt_results. ELSE. APPEND VALUE #( task_id = lv_task_id success = abap_false message = COND #( WHEN lv_message IS NOT INITIAL THEN lv_message ELSE |Erreur { sy-subrc }| ) ) TO mt_results. ENDIF.
mv_completed = mv_completed + 1.ENDFORM.7. Module fonction RFC pour parallélisation
FUNCTION z_parallel_worker.*"----------------------------------------------------------------------*"*"Interface locale :*" IMPORTING*" VALUE(IV_TASK_ID) TYPE I*" VALUE(IT_DATA) TYPE TY_DATA_TAB*" EXPORTING*" VALUE(ET_RESULTS) TYPE TY_RESULT_TAB*" VALUE(EV_TASK_ID) TYPE I*"----------------------------------------------------------------------
ev_task_id = iv_task_id.
LOOP AT it_data INTO DATA(ls_data). " Traitement DATA(ls_result) = VALUE ty_result( id = ls_data-id status = 'PROCESSED" value = ls_data-value * 2 ). APPEND ls_result TO et_results. ENDLOOP.
ENDFUNCTION.8. CL_ABAP_PARALLEL pour parallélisation moderne
CLASS zcl_modern_parallel DEFINITION. PUBLIC SECTION. INTERFACES: if_abap_parallel.
CLASS-METHODS: run_parallel IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE string_table.
PRIVATE SECTION. DATA: mv_item TYPE string.ENDCLASS.
CLASS zcl_modern_parallel IMPLEMENTATION. METHOD if_abap_parallel~do. " Cette méthode est exécutée en parallèle DATA(lv_result) = |Traité : { mv_item }|.
" Sérialiser le résultat p_out = cl_abap_parallel=>get_parallel_output( ). EXPORT result = lv_result TO DATA BUFFER p_out. ENDMETHOD.
METHOD run_parallel. DATA: lt_instances TYPE TABLE OF REF TO zcl_modern_parallel, lt_handles TYPE abap_parallel_handles.
" Créer les instances LOOP AT it_items INTO DATA(lv_item). DATA(lo_instance) = NEW zcl_modern_parallel( ). lo_instance->mv_item = lv_item. APPEND lo_instance TO lt_instances. ENDLOOP.
" Exécuter en parallèle TRY. cl_abap_parallel=>run_parallel( EXPORTING i_instances = lt_instances IMPORTING e_handles = lt_handles ).
" Collecter les résultats LOOP AT lt_handles INTO DATA(ls_handle). IF ls_handle-done = abap_true. DATA: lv_result TYPE string. IMPORT result = lv_result FROM DATA BUFFER ls_handle-out. IF sy-subrc = 0. APPEND lv_result TO rt_results. ENDIF. ENDIF. ENDLOOP.
CATCH cx_abap_parallel INTO DATA(lx_error). " Gestion des erreurs ENDTRY. ENDMETHOD.ENDCLASS.9. Indicateur de progression
CLASS zcl_parallel_progress DEFINITION. PUBLIC SECTION. METHODS: process_with_progress IMPORTING it_items TYPE string_table.
PRIVATE SECTION. DATA: mv_total TYPE i, mv_completed TYPE i, mv_errors TYPE i.
METHODS: update_progress.ENDCLASS.
CLASS zcl_parallel_progress IMPLEMENTATION. METHOD process_with_progress. mv_total = lines( it_items ). mv_completed = 0. mv_errors = 0.
DATA(lv_task_no) = 0.
" Initialiser l'indicateur de progression CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR" EXPORTING percentage = 0 text = |Démarrage de { mv_total } tasks parallèles...|.
" Démarrer les tasks LOOP AT it_items INTO DATA(lv_item). lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROG{ lv_task_no }| DESTINATION 'NONE" CALLING on_progress_update ON END OF TASK EXPORTING iv_item = lv_item. ENDLOOP.
" Attendre et actualiser la progression WHILE mv_completed < mv_total. WAIT UNTIL mv_completed >= mv_total UP TO 1 SECONDS. update_progress( ). ENDWHILE.
" Finalisation CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR" EXPORTING percentage = 100 text = |Terminé : { mv_completed } OK, { mv_errors } Erreurs|. ENDMETHOD.
METHOD update_progress. DATA(lv_percent) = mv_completed * 100 / mv_total.
CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR" EXPORTING percentage = lv_percent text = |{ mv_completed }/{ mv_total } traités ({ mv_errors } erreurs)|. ENDMETHOD.ENDCLASS.10. Pattern Map-Reduce
CLASS zcl_map_reduce DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_key_value, key TYPE string, value TYPE i, END OF ty_key_value, ty_key_values TYPE STANDARD TABLE OF ty_key_value WITH KEY key.
METHODS: map_reduce IMPORTING it_data TYPE string_table RETURNING VALUE(rt_result) TYPE ty_key_values.
PRIVATE SECTION. DATA: mt_map_results TYPE ty_key_values, mv_map_done TYPE i, mv_map_total TYPE i.ENDCLASS.
CLASS zcl_map_reduce IMPLEMENTATION. METHOD map_reduce. " Phase MAP - parallèle mv_map_total = lines( it_data ). mv_map_done = 0. CLEAR mt_map_results.
DATA(lv_chunk_no) = 0. LOOP AT it_data INTO DATA(lv_data). lv_chunk_no = lv_chunk_no + 1.
CALL FUNCTION 'Z_MAP_FUNCTION' STARTING NEW TASK |MAP{ lv_chunk_no }| DESTINATION 'NONE" CALLING on_map_complete ON END OF TASK EXPORTING iv_data = lv_data. ENDLOOP.
" Attendre tous les tasks Map WAIT UNTIL mv_map_done >= mv_map_total UP TO 300 SECONDS.
" Phase REDUCE - local (agréger) DATA: lt_grouped TYPE SORTED TABLE OF ty_key_value WITH UNIQUE KEY key.
LOOP AT mt_map_results INTO DATA(ls_result). READ TABLE lt_grouped ASSIGNING FIELD-SYMBOL(<fs_group>) WITH KEY key = ls_result-key. IF sy-subrc = 0. <fs_group>-value = <fs_group>-value + ls_result-value. ELSE. INSERT ls_result INTO TABLE lt_grouped. ENDIF. ENDLOOP.
rt_result = lt_grouped. ENDMETHOD.ENDCLASS.11. Traitement parallèle transactionnel
CLASS zcl_transactional_parallel DEFINITION. PUBLIC SECTION. METHODS: process_and_commit IMPORTING it_items TYPE ty_item_tab RETURNING VALUE(rv_success) TYPE abap_bool.
PRIVATE SECTION. DATA: mv_all_success TYPE abap_bool VALUE abap_true, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_transactional_parallel IMPLEMENTATION. METHOD process_and_commit. mv_total = lines( it_items ). mv_completed = 0. mv_all_success = abap_true.
DATA(lv_task_no) = 0.
" Traitement parallèle (sans commit) LOOP AT it_items INTO DATA(ls_item). lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_NO_COMMIT' STARTING NEW TASK |TRANS{ lv_task_no }| DESTINATION 'NONE" CALLING on_trans_complete ON END OF TASK EXPORTING is_item = ls_item. ENDLOOP.
" Attendre tous WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
" Commit ou Rollback IF mv_all_success = abap_true. CALL FUNCTION 'BAPI_TRANSACTION_COMMIT" EXPORTING wait = abap_true. rv_success = abap_true. ELSE. CALL FUNCTION 'BAPI_TRANSACTION_ROLLBACK'. rv_success = abap_false. ENDIF. ENDMETHOD.ENDCLASS.12. Exemple pratique : Export de données de masse
CLASS zcl_mass_export DEFINITION. PUBLIC SECTION. METHODS: export_customers_parallel IMPORTING iv_country TYPE land1 RETURNING VALUE(rv_file) TYPE string.
PRIVATE SECTION. CONSTANTS: c_chunk_size TYPE i VALUE 1000.
DATA: mt_all_data TYPE ty_customer_tab, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_mass_export IMPLEMENTATION. METHOD export_customers_parallel. " Charger les numéros de clients SELECT kunnr FROM kna1 WHERE land1 = @iv_country INTO TABLE @DATA(lt_customers).
mv_total = lines( lt_customers ) / c_chunk_size + 1. mv_completed = 0. CLEAR mt_all_data.
" Charger les chunks en parallèle DATA: lv_from TYPE i VALUE 1, lv_chunk_no TYPE i VALUE 0.
WHILE lv_from <= lines( lt_customers ). lv_chunk_no = lv_chunk_no + 1.
" Calculer la plage du chunk DATA(lv_to) = lv_from + c_chunk_size - 1. IF lv_to > lines( lt_customers ). lv_to = lines( lt_customers ). ENDIF.
" Numéros de clients pour le chunk DATA(lt_chunk_keys) = VALUE ty_kunnr_tab( FOR i = lv_from WHILE i <= lv_to ( lt_customers[ i ]-kunnr ) ).
" Charger en parallèle CALL FUNCTION 'Z_LOAD_CUSTOMER_DATA' STARTING NEW TASK |EXP{ lv_chunk_no }| DESTINATION 'NONE" CALLING on_chunk_loaded ON END OF TASK EXPORTING it_customers = lt_chunk_keys.
lv_from = lv_to + 1. ENDWHILE.
" Attendre WAIT UNTIL mv_completed >= mv_total UP TO 600 SECONDS.
" Écrire le fichier rv_file = |/tmp/customers_{ iv_country }_{ sy-datum }.csv|.
OPEN DATASET rv_file FOR OUTPUT IN TEXT MODE ENCODING UTF-8. IF sy-subrc = 0. TRANSFER 'KUNNR;NAME1;ORT01;LAND1' TO rv_file. LOOP AT mt_all_data INTO DATA(ls_data). DATA(lv_line) = |{ ls_data-kunnr };{ ls_data-name1 };{ ls_data-ort01 };{ ls_data-land1 }|. TRANSFER lv_line TO rv_file. ENDLOOP. CLOSE DATASET rv_file. ENDIF. ENDMETHOD.ENDCLASS.Options de destination
| Destination | Description |
|---|---|
'NONE' | Serveur local, même instance |
IN GROUP | Utiliser groupe de serveurs |
IN GROUP DEFAULT | Groupe standard |
| Destination RFC | Système externe |
Variantes WAIT
" Attendre une conditionWAIT UNTIL gv_done = abap_true.
" Avec timeoutWAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
" Attendre n'importe quel taskWAIT FOR ASYNCHRONOUS TASKS UNTIL gv_count > 0.
" Plusieurs conditionsWAIT UNTIL gv_task1_done = abap_true AND gv_task2_done = abap_true UP TO 120 SECONDS.Remarques importantes / Best Practice
- Nombre de tasks parallèles à adapter aux processus de travail disponibles.
- SPBT_INITIALIZE pour utiliser la configuration de groupes de serveurs.
- Gestion d’erreur dans RECEIVE RESULTS avec EXCEPTIONS.
- Toujours définir un Timeout (UP TO … SECONDS).
- Pas de verrous DB sur plusieurs tasks - éviter les deadlocks.
- La Callback-Form/Method doit exister au runtime.
- Optimiser le volume de données par task (ni trop gros, ni trop petit).
- Les modules fonction RFC doivent être remote-enabled.
- COMMIT WORK uniquement après la fin de tous les tasks.
- Combinez avec HTTP-Client pour appels API parallèles.