ABAP Parallel Processing: aRFC, STARTING NEW TASK

Category
ABAP-Statements
Published
Author
Johannes

Parallel Processing in ABAP enables the simultaneous execution of tasks across multiple work processes. With asynchronous RFC calls (aRFC) and STARTING NEW TASK, data-intensive operations can be significantly accelerated.

Basic Concept

MethodDescription
STARTING NEW TASKStarts asynchronous RFC
CALLING ... ON END OF TASKCallback on completion
WAIT UNTILWaits for results
RECEIVE RESULTSReceives return values

Syntax

" Asynchronous call
CALL FUNCTION 'FUNC_NAME' STARTING NEW TASK task_name
DESTINATION destination
CALLING callback_method ON END OF TASK
EXPORTING
param = value
TABLES
itab = lt_data.
" Wait for results
WAIT UNTIL lv_counter >= lv_expected.
" Retrieve results (in callback)
RECEIVE RESULTS FROM FUNCTION 'FUNC_NAME'
IMPORTING
result = lv_result
TABLES
itab = lt_result.

Examples

1. Simple Asynchronous Call

REPORT zparallel_simple.
DATA: gv_result TYPE string,
gv_done TYPE abap_bool.
START-OF-SELECTION.
" Start asynchronous task
CALL FUNCTION 'Z_LONG_RUNNING_TASK' STARTING NEW TASK 'TASK1'
DESTINATION 'NONE'
CALLING on_task_complete ON END OF TASK
EXPORTING
iv_input = 'Test'.
" Wait for result
WAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
IF gv_done = abap_true.
WRITE: / 'Result:', gv_result.
ELSE.
WRITE: / 'Timeout!'.
ENDIF.
FORM on_task_complete USING p_task TYPE clike.
RECEIVE RESULTS FROM FUNCTION 'Z_LONG_RUNNING_TASK'
IMPORTING
ev_result = gv_result.
gv_done = abap_true.
ENDFORM.

2. Parallel Processing with Class

CLASS zcl_parallel_processor DEFINITION.
PUBLIC SECTION.
TYPES: BEGIN OF ty_task,
id TYPE i,
status TYPE string,
result TYPE string,
END OF ty_task,
ty_tasks TYPE STANDARD TABLE OF ty_task WITH KEY id.
METHODS: run_parallel
IMPORTING it_items TYPE string_table
RETURNING VALUE(rt_results) TYPE ty_tasks.
METHODS: on_task_end FOR EVENT task_end OF cl_abap_parallel.
PRIVATE SECTION.
DATA: mt_tasks TYPE ty_tasks,
mv_completed TYPE i,
mv_total TYPE i.
ENDCLASS.
CLASS zcl_parallel_processor IMPLEMENTATION.
METHOD run_parallel.
mv_total = lines( it_items ).
mv_completed = 0.
CLEAR mt_tasks.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item).
lv_task_id = lv_task_id + 1.
APPEND VALUE #(
id = lv_task_id
status = 'RUNNING'
) TO mt_tasks.
" Asynchronous call
CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |TASK{ lv_task_id }|
DESTINATION 'NONE'
CALLING on_task_complete ON END OF TASK
EXPORTING
iv_item = lv_item
iv_task_id = lv_task_id.
ENDLOOP.
" Wait until all complete
WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
rt_results = mt_tasks.
ENDMETHOD.
METHOD on_task_complete.
DATA: lv_result TYPE string,
lv_task_id TYPE i.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_ITEM'
IMPORTING
ev_result = lv_result
ev_task_id = lv_task_id.
" Save result
MODIFY mt_tasks FROM VALUE #(
id = lv_task_id
status = 'COMPLETED'
result = lv_result
) TRANSPORTING status result WHERE id = lv_task_id.
mv_completed = mv_completed + 1.
ENDMETHOD.
ENDCLASS.

3. Batch Processing with Chunks

CLASS zcl_batch_parallel DEFINITION.
PUBLIC SECTION.
CONSTANTS: c_max_tasks TYPE i VALUE 10.
METHODS: process_in_batches
IMPORTING it_data TYPE ty_data_tab
RETURNING VALUE(rt_results) TYPE ty_result_tab.
PRIVATE SECTION.
DATA: mv_active_tasks TYPE i,
mv_completed TYPE i,
mt_results TYPE ty_result_tab.
METHODS: on_batch_complete IMPORTING p_task TYPE clike.
ENDCLASS.
CLASS zcl_batch_parallel IMPLEMENTATION.
METHOD process_in_batches.
DATA: lt_chunk TYPE ty_data_tab,
lv_chunk_size TYPE i,
lv_task_no TYPE i.
" Calculate chunk size
lv_chunk_size = COND #(
WHEN lines( it_data ) <= c_max_tasks THEN 1
ELSE lines( it_data ) / c_max_tasks + 1
).
mv_active_tasks = 0.
mv_completed = 0.
CLEAR mt_results.
" Split data into chunks and process in parallel
DATA(lv_start) = 1.
WHILE lv_start <= lines( it_data ).
lv_task_no = lv_task_no + 1.
" Create chunk
CLEAR lt_chunk.
LOOP AT it_data INTO DATA(ls_data) FROM lv_start.
APPEND ls_data TO lt_chunk.
IF lines( lt_chunk ) >= lv_chunk_size.
EXIT.
ENDIF.
ENDLOOP.
" Start task
CALL FUNCTION 'Z_PROCESS_BATCH' STARTING NEW TASK |BATCH{ lv_task_no }|
DESTINATION 'NONE'
CALLING on_batch_complete ON END OF TASK
EXPORTING
iv_batch_id = lv_task_no
TABLES
it_data = lt_chunk.
mv_active_tasks = mv_active_tasks + 1.
lv_start = lv_start + lv_chunk_size.
" Limit max concurrent tasks
IF mv_active_tasks >= c_max_tasks.
WAIT UNTIL mv_completed > 0 UP TO 60 SECONDS.
ENDIF.
ENDWHILE.
" Wait for all remaining tasks
WAIT UNTIL mv_active_tasks = mv_completed UP TO 300 SECONDS.
rt_results = mt_results.
ENDMETHOD.
METHOD on_batch_complete.
DATA: lt_batch_results TYPE ty_result_tab.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_BATCH'
TABLES
et_results = lt_batch_results.
APPEND LINES OF lt_batch_results TO mt_results.
mv_completed = mv_completed + 1.
ENDMETHOD.
ENDCLASS.

4. Using Server Groups

DATA: lv_group TYPE rzlli_apts.
" Determine server group
CALL FUNCTION 'SPBT_INITIALIZE'
EXPORTING
group_name = 'parallel_generators'
IMPORTING
free_pbt_wps = DATA(lv_free_wps)
EXCEPTIONS
invalid_group_name = 1
internal_error = 2
pbt_env_already_initialized = 3
currently_no_resources_avail = 4
no_pbt_resources_found = 5
cant_init_different_pbt_groups = 6
OTHERS = 7.
IF sy-subrc = 0.
lv_group = 'parallel_generators'.
ELSE.
lv_group = 'parallel_generators'. " Default group
ENDIF.
" Call with server group
CALL FUNCTION 'Z_HEAVY_CALCULATION' STARTING NEW TASK 'CALC1'
DESTINATION IN GROUP lv_group
CALLING on_calculation_done ON END OF TASK
EXPORTING
iv_param = lv_param.

5. Resource Management

CLASS zcl_resource_manager DEFINITION.
PUBLIC SECTION.
METHODS: get_available_processes
RETURNING VALUE(rv_count) TYPE i.
METHODS: process_with_throttling
IMPORTING it_items TYPE string_table.
PRIVATE SECTION.
DATA: mv_max_parallel TYPE i VALUE 5,
mv_running TYPE i,
mv_completed TYPE i.
ENDCLASS.
CLASS zcl_resource_manager IMPLEMENTATION.
METHOD get_available_processes.
" Determine free work processes
CALL FUNCTION 'SPBT_INITIALIZE'
EXPORTING
group_name = 'parallel_generators'
IMPORTING
free_pbt_wps = rv_count
EXCEPTIONS
OTHERS = 1.
IF sy-subrc <> 0.
rv_count = 3. " Fallback
ENDIF.
" Limit maximum
IF rv_count > mv_max_parallel.
rv_count = mv_max_parallel.
ENDIF.
ENDMETHOD.
METHOD process_with_throttling.
DATA(lv_max) = get_available_processes( ).
mv_running = 0.
mv_completed = 0.
DATA(lv_task_no) = 0.
LOOP AT it_items INTO DATA(lv_item).
lv_task_no = lv_task_no + 1.
" Wait if maximum reached
WHILE mv_running - mv_completed >= lv_max.
WAIT UNTIL mv_completed > ( mv_running - lv_max ) UP TO 5 SECONDS.
ENDWHILE.
" Start task
CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROC{ lv_task_no }|
DESTINATION 'NONE'
CALLING on_item_done ON END OF TASK
EXPORTING
iv_item = lv_item.
mv_running = mv_running + 1.
ENDLOOP.
" Wait for all
WAIT UNTIL mv_completed >= mv_running UP TO 600 SECONDS.
ENDMETHOD.
ENDCLASS.

6. Error Handling

CLASS zcl_parallel_with_errors DEFINITION.
PUBLIC SECTION.
TYPES: BEGIN OF ty_result,
task_id TYPE i,
success TYPE abap_bool,
message TYPE string,
data TYPE string,
END OF ty_result,
ty_results TYPE STANDARD TABLE OF ty_result WITH KEY task_id.
METHODS: process_with_error_handling
IMPORTING it_items TYPE string_table
RETURNING VALUE(rt_results) TYPE ty_results.
PRIVATE SECTION.
DATA: mt_results TYPE ty_results,
mv_completed TYPE i,
mv_total TYPE i.
ENDCLASS.
CLASS zcl_parallel_with_errors IMPLEMENTATION.
METHOD process_with_error_handling.
mv_total = lines( it_items ).
mv_completed = 0.
CLEAR mt_results.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item).
lv_task_id = lv_task_id + 1.
TRY.
CALL FUNCTION 'Z_RISKY_OPERATION' STARTING NEW TASK |RISK{ lv_task_id }|
DESTINATION 'NONE'
CALLING on_risky_complete ON END OF TASK
EXPORTING
iv_item = lv_item
iv_task_id = lv_task_id.
CATCH cx_root INTO DATA(lx_error).
" Document start error
APPEND VALUE #(
task_id = lv_task_id
success = abap_false
message = lx_error->get_text( )
) TO mt_results.
mv_completed = mv_completed + 1.
ENDTRY.
ENDLOOP.
" Timeout with error handling
DATA(lv_start) = sy-uzeit.
WHILE mv_completed < mv_total.
WAIT UNTIL mv_completed >= mv_total UP TO 5 SECONDS.
" Timeout check (5 minutes)
IF sy-uzeit - lv_start > 300.
" Mark remaining as timeout
LOOP AT mt_results INTO DATA(ls_result) WHERE success IS INITIAL.
ls_result-message = 'Timeout'.
MODIFY mt_results FROM ls_result.
ENDLOOP.
EXIT.
ENDIF.
ENDWHILE.
rt_results = mt_results.
ENDMETHOD.
ENDCLASS.
" Callback with error handling
FORM on_risky_complete USING p_task TYPE clike.
DATA: lv_result TYPE string,
lv_task_id TYPE i,
lv_subrc TYPE sy-subrc,
lv_message TYPE string.
RECEIVE RESULTS FROM FUNCTION 'Z_RISKY_OPERATION'
IMPORTING
ev_result = lv_result
ev_task_id = lv_task_id
EXCEPTIONS
communication_failure = 1 MESSAGE lv_message
system_failure = 2 MESSAGE lv_message
OTHERS = 3.
IF sy-subrc = 0.
APPEND VALUE #(
task_id = lv_task_id
success = abap_true
data = lv_result
) TO mt_results.
ELSE.
APPEND VALUE #(
task_id = lv_task_id
success = abap_false
message = COND #( WHEN lv_message IS NOT INITIAL THEN lv_message
ELSE |Error { sy-subrc }| )
) TO mt_results.
ENDIF.
mv_completed = mv_completed + 1.
ENDFORM.

7. RFC Function Module for Parallelization

FUNCTION z_parallel_worker.
*"----------------------------------------------------------------------
*"*"Local Interface:
*" IMPORTING
*" VALUE(IV_TASK_ID) TYPE I
*" VALUE(IT_DATA) TYPE TY_DATA_TAB
*" EXPORTING
*" VALUE(ET_RESULTS) TYPE TY_RESULT_TAB
*" VALUE(EV_TASK_ID) TYPE I
*"----------------------------------------------------------------------
ev_task_id = iv_task_id.
LOOP AT it_data INTO DATA(ls_data).
" Processing
DATA(ls_result) = VALUE ty_result(
id = ls_data-id
status = 'PROCESSED'
value = ls_data-value * 2
).
APPEND ls_result TO et_results.
ENDLOOP.
ENDFUNCTION.

8. CL_ABAP_PARALLEL for Modern Parallelization

CLASS zcl_modern_parallel DEFINITION.
PUBLIC SECTION.
INTERFACES: if_abap_parallel.
CLASS-METHODS: run_parallel
IMPORTING it_items TYPE string_table
RETURNING VALUE(rt_results) TYPE string_table.
PRIVATE SECTION.
DATA: mv_item TYPE string.
ENDCLASS.
CLASS zcl_modern_parallel IMPLEMENTATION.
METHOD if_abap_parallel~do.
" This method is executed in parallel
DATA(lv_result) = |Processed: { mv_item }|.
" Serialize result
p_out = cl_abap_parallel=>get_parallel_output( ).
EXPORT result = lv_result TO DATA BUFFER p_out.
ENDMETHOD.
METHOD run_parallel.
DATA: lt_instances TYPE TABLE OF REF TO zcl_modern_parallel,
lt_handles TYPE abap_parallel_handles.
" Create instances
LOOP AT it_items INTO DATA(lv_item).
DATA(lo_instance) = NEW zcl_modern_parallel( ).
lo_instance->mv_item = lv_item.
APPEND lo_instance TO lt_instances.
ENDLOOP.
" Execute in parallel
TRY.
cl_abap_parallel=>run_parallel(
EXPORTING
i_instances = lt_instances
IMPORTING
e_handles = lt_handles
).
" Collect results
LOOP AT lt_handles INTO DATA(ls_handle).
IF ls_handle-done = abap_true.
DATA: lv_result TYPE string.
IMPORT result = lv_result FROM DATA BUFFER ls_handle-out.
IF sy-subrc = 0.
APPEND lv_result TO rt_results.
ENDIF.
ENDIF.
ENDLOOP.
CATCH cx_abap_parallel INTO DATA(lx_error).
" Error handling
ENDTRY.
ENDMETHOD.
ENDCLASS.

9. Progress Indicator

CLASS zcl_parallel_progress DEFINITION.
PUBLIC SECTION.
METHODS: process_with_progress
IMPORTING it_items TYPE string_table.
PRIVATE SECTION.
DATA: mv_total TYPE i,
mv_completed TYPE i,
mv_errors TYPE i.
METHODS: update_progress.
ENDCLASS.
CLASS zcl_parallel_progress IMPLEMENTATION.
METHOD process_with_progress.
mv_total = lines( it_items ).
mv_completed = 0.
mv_errors = 0.
DATA(lv_task_no) = 0.
" Initialize progress indicator
CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR'
EXPORTING
percentage = 0
text = |Starting { mv_total } parallel tasks...|.
" Start tasks
LOOP AT it_items INTO DATA(lv_item).
lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROG{ lv_task_no }|
DESTINATION 'NONE'
CALLING on_progress_update ON END OF TASK
EXPORTING
iv_item = lv_item.
ENDLOOP.
" Wait and update progress
WHILE mv_completed < mv_total.
WAIT UNTIL mv_completed >= mv_total UP TO 1 SECONDS.
update_progress( ).
ENDWHILE.
" Completion
CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR'
EXPORTING
percentage = 100
text = |Done: { mv_completed } OK, { mv_errors } errors|.
ENDMETHOD.
METHOD update_progress.
DATA(lv_percent) = mv_completed * 100 / mv_total.
CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR'
EXPORTING
percentage = lv_percent
text = |{ mv_completed }/{ mv_total } processed ({ mv_errors } errors)|.
ENDMETHOD.
ENDCLASS.

10. Map-Reduce Pattern

CLASS zcl_map_reduce DEFINITION.
PUBLIC SECTION.
TYPES: BEGIN OF ty_key_value,
key TYPE string,
value TYPE i,
END OF ty_key_value,
ty_key_values TYPE STANDARD TABLE OF ty_key_value WITH KEY key.
METHODS: map_reduce
IMPORTING it_data TYPE string_table
RETURNING VALUE(rt_result) TYPE ty_key_values.
PRIVATE SECTION.
DATA: mt_map_results TYPE ty_key_values,
mv_map_done TYPE i,
mv_map_total TYPE i.
ENDCLASS.
CLASS zcl_map_reduce IMPLEMENTATION.
METHOD map_reduce.
" MAP Phase - parallel
mv_map_total = lines( it_data ).
mv_map_done = 0.
CLEAR mt_map_results.
DATA(lv_chunk_no) = 0.
LOOP AT it_data INTO DATA(lv_data).
lv_chunk_no = lv_chunk_no + 1.
CALL FUNCTION 'Z_MAP_FUNCTION' STARTING NEW TASK |MAP{ lv_chunk_no }|
DESTINATION 'NONE'
CALLING on_map_complete ON END OF TASK
EXPORTING
iv_data = lv_data.
ENDLOOP.
" Wait for all map tasks
WAIT UNTIL mv_map_done >= mv_map_total UP TO 300 SECONDS.
" REDUCE Phase - local (aggregate)
DATA: lt_grouped TYPE SORTED TABLE OF ty_key_value WITH UNIQUE KEY key.
LOOP AT mt_map_results INTO DATA(ls_result).
READ TABLE lt_grouped ASSIGNING FIELD-SYMBOL(<fs_group>)
WITH KEY key = ls_result-key.
IF sy-subrc = 0.
<fs_group>-value = <fs_group>-value + ls_result-value.
ELSE.
INSERT ls_result INTO TABLE lt_grouped.
ENDIF.
ENDLOOP.
rt_result = lt_grouped.
ENDMETHOD.
ENDCLASS.

11. Transactional Parallel Processing

CLASS zcl_transactional_parallel DEFINITION.
PUBLIC SECTION.
METHODS: process_and_commit
IMPORTING it_items TYPE ty_item_tab
RETURNING VALUE(rv_success) TYPE abap_bool.
PRIVATE SECTION.
DATA: mv_all_success TYPE abap_bool VALUE abap_true,
mv_completed TYPE i,
mv_total TYPE i.
ENDCLASS.
CLASS zcl_transactional_parallel IMPLEMENTATION.
METHOD process_and_commit.
mv_total = lines( it_items ).
mv_completed = 0.
mv_all_success = abap_true.
DATA(lv_task_no) = 0.
" Parallel processing (without commit)
LOOP AT it_items INTO DATA(ls_item).
lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_NO_COMMIT' STARTING NEW TASK |TRANS{ lv_task_no }|
DESTINATION 'NONE'
CALLING on_trans_complete ON END OF TASK
EXPORTING
is_item = ls_item.
ENDLOOP.
" Wait for all
WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
" Commit or rollback
IF mv_all_success = abap_true.
CALL FUNCTION 'BAPI_TRANSACTION_COMMIT'
EXPORTING
wait = abap_true.
rv_success = abap_true.
ELSE.
CALL FUNCTION 'BAPI_TRANSACTION_ROLLBACK'.
rv_success = abap_false.
ENDIF.
ENDMETHOD.
ENDCLASS.

12. Practical Example: Mass Data Export

CLASS zcl_mass_export DEFINITION.
PUBLIC SECTION.
METHODS: export_customers_parallel
IMPORTING iv_country TYPE land1
RETURNING VALUE(rv_file) TYPE string.
PRIVATE SECTION.
CONSTANTS: c_chunk_size TYPE i VALUE 1000.
DATA: mt_all_data TYPE ty_customer_tab,
mv_completed TYPE i,
mv_total TYPE i.
ENDCLASS.
CLASS zcl_mass_export IMPLEMENTATION.
METHOD export_customers_parallel.
" Load customer numbers
SELECT kunnr FROM kna1
WHERE land1 = @iv_country
INTO TABLE @DATA(lt_customers).
mv_total = lines( lt_customers ) / c_chunk_size + 1.
mv_completed = 0.
CLEAR mt_all_data.
" Load chunks in parallel
DATA: lv_from TYPE i VALUE 1,
lv_chunk_no TYPE i VALUE 0.
WHILE lv_from <= lines( lt_customers ).
lv_chunk_no = lv_chunk_no + 1.
" Calculate chunk range
DATA(lv_to) = lv_from + c_chunk_size - 1.
IF lv_to > lines( lt_customers ).
lv_to = lines( lt_customers ).
ENDIF.
" Customer numbers for chunk
DATA(lt_chunk_keys) = VALUE ty_kunnr_tab(
FOR i = lv_from WHILE i <= lv_to
( lt_customers[ i ]-kunnr )
).
" Load in parallel
CALL FUNCTION 'Z_LOAD_CUSTOMER_DATA' STARTING NEW TASK |EXP{ lv_chunk_no }|
DESTINATION 'NONE'
CALLING on_chunk_loaded ON END OF TASK
EXPORTING
it_customers = lt_chunk_keys.
lv_from = lv_to + 1.
ENDWHILE.
" Wait
WAIT UNTIL mv_completed >= mv_total UP TO 600 SECONDS.
" Write file
rv_file = |/tmp/customers_{ iv_country }_{ sy-datum }.csv|.
OPEN DATASET rv_file FOR OUTPUT IN TEXT MODE ENCODING UTF-8.
IF sy-subrc = 0.
TRANSFER 'KUNNR;NAME1;ORT01;LAND1' TO rv_file.
LOOP AT mt_all_data INTO DATA(ls_data).
DATA(lv_line) = |{ ls_data-kunnr };{ ls_data-name1 };{ ls_data-ort01 };{ ls_data-land1 }|.
TRANSFER lv_line TO rv_file.
ENDLOOP.
CLOSE DATASET rv_file.
ENDIF.
ENDMETHOD.
ENDCLASS.

Destination Options

DestinationDescription
'NONE'Local server, same instance
IN GROUPUse server group
IN GROUP DEFAULTDefault group
RFC destinationExternal system

WAIT Variants

" Wait for condition
WAIT UNTIL gv_done = abap_true.
" With timeout
WAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
" Wait for any task
WAIT FOR ASYNCHRONOUS TASKS UNTIL gv_count > 0.
" Multiple conditions
WAIT UNTIL gv_task1_done = abap_true
AND gv_task2_done = abap_true
UP TO 120 SECONDS.

Best Practices

  • Number of parallel tasks should match available work processes.
  • Use SPBT_INITIALIZE for server group configuration.
  • Error handling in RECEIVE RESULTS with EXCEPTIONS.
  • Always define timeout (UP TO … SECONDS).
  • No DB locks across tasks – avoid deadlocks.
  • Callback form/method must exist at runtime.
  • Optimize data volume per task (not too large, not too small).
  • RFC function modules must be remote-enabled.
  • COMMIT WORK only after all tasks complete.
  • Combine with HTTP Client for parallel API calls.