Massendaten-Processing Best Practices in ABAP Cloud

kategorie
Best Practices
Veröffentlicht
autor
Johannes

Die Verarbeitung grosser Datenmengen stellt in ABAP Cloud besondere Herausforderungen dar. Memory-Limits, Timeout-Grenzen und die Cloud-Architektur erfordern durchdachte Strategien. Dieser Artikel zeigt bewährte Patterns für effizientes Massendaten-Processing.

Herausforderungen bei Massendaten

Typische Probleme

ProblemUrsacheAuswirkung
Memory OverflowAlle Daten auf einmal ladenProgrammabbruch
TimeoutZu lange LaufzeitHTTP 504 Gateway Timeout
Lock ContentionParallele Zugriffe auf gleiche DatenPerformance-Einbruch
InkonsistenzAbbruch während VerarbeitungTeilweise verarbeitete Daten
Fehlende TransparenzKein Status-TrackingBenutzer im Unklaren

Cloud-spezifische Limits

In SAP BTP ABAP Environment gelten strikte Ressourcen-Limits:

Work Process Timeout: 600 Sekunden (max)
Memory pro Dialog WP: ~2 GB
Extended Memory: Begrenzt pro Session
Datenbankverbindungen: Pool-basiert

Batch Processing mit Paketen

Grundprinzip: Teile und Herrsche

Anstatt alle Daten auf einmal zu verarbeiten, werden sie in handhabbare Pakete (Chunks) aufgeteilt:

CLASS zcl_mass_processor DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
CONSTANTS:
c_package_size TYPE i VALUE 1000.
METHODS:
process_all_customers
RETURNING VALUE(rs_result) TYPE ztt_processing_result.
PRIVATE SECTION.
METHODS:
get_customer_count
RETURNING VALUE(rv_count) TYPE i,
get_customer_package
IMPORTING iv_offset TYPE i
iv_limit TYPE i
RETURNING VALUE(rt_customers) TYPE ztt_customers,
process_customer_package
IMPORTING it_customers TYPE ztt_customers
RETURNING VALUE(rt_results) TYPE ztt_processing_result.
ENDCLASS.
CLASS zcl_mass_processor IMPLEMENTATION.
METHOD process_all_customers.
DATA: lv_offset TYPE i VALUE 0.
" Gesamtanzahl ermitteln
DATA(lv_total) = get_customer_count( ).
" In Paketen verarbeiten
WHILE lv_offset < lv_total.
" Paket laden
DATA(lt_customers) = get_customer_package(
iv_offset = lv_offset
iv_limit = c_package_size
).
" Paket verarbeiten
DATA(lt_package_results) = process_customer_package( lt_customers ).
" Ergebnisse sammeln
APPEND LINES OF lt_package_results TO rs_result.
" Nach jedem Paket: COMMIT WORK
COMMIT WORK AND WAIT.
" Offset erhöhen
lv_offset = lv_offset + c_package_size.
" Memory freigeben
CLEAR lt_customers.
ENDWHILE.
ENDMETHOD.
METHOD get_customer_count.
SELECT COUNT(*) FROM zcustomer INTO @rv_count
WHERE processing_status = @zif_constants=>c_status_pending.
ENDMETHOD.
METHOD get_customer_package.
SELECT * FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_pending
ORDER BY customer_id
OFFSET @iv_offset
UP TO @iv_limit ROWS
INTO TABLE @rt_customers.
ENDMETHOD.
METHOD process_customer_package.
LOOP AT it_customers INTO DATA(ls_customer).
TRY.
" Geschäftslogik ausführen
DATA(ls_result) = process_single_customer( ls_customer ).
APPEND ls_result TO rt_results.
CATCH cx_root INTO DATA(lx_error).
" Fehler protokollieren, aber weitermachen
APPEND VALUE #(
customer_id = ls_customer-customer_id
status = 'ERROR'
message = lx_error->get_text( )
) TO rt_results.
ENDTRY.
ENDLOOP.
ENDMETHOD.
ENDCLASS.

Optimale Paketgröße

Die ideale Paketgröße hängt von verschiedenen Faktoren ab:

FaktorKleine Pakete (100-500)Grosse Pakete (1000-5000)
MemoryWeniger VerbrauchHöherer Verbrauch
OverheadMehr DB-RoundtripsWeniger Overhead
RestartWeniger DatenverlustMehr Datenverlust
ParallelisierungBesser verteilbarSchlechter verteilbar

Empfehlung: Starten Sie mit 1000 Records und passen Sie basierend auf Messungen an.

Parallelisierung mit BGPF

Für maximale Performance können Pakete parallel verarbeitet werden. Das Background Processing Framework (bgPF) bietet dafür eine elegante Lösung.

Parallel Processing Architektur

┌─────────────────────────────────────────────────────┐
│ Main Process │
│ ┌─────────────────────────────────────────────────┐│
│ │ 1. Daten in Pakete aufteilen ││
│ │ 2. bgPF Jobs für jedes Paket starten ││
│ │ 3. Auf Completion warten ││
│ │ 4. Ergebnisse aggregieren ││
│ └─────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│Worker 1│ │Worker 2│ │Worker 3│ │Worker 4│
│Paket 1 │ │Paket 2 │ │Paket 3 │ │Paket 4 │
└────────┘ └────────┘ └────────┘ └────────┘

Implementation mit bgPF

CLASS zcl_parallel_processor DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
INTERFACES if_bgmc_op_single_tx_uncontr.
TYPES:
BEGIN OF ty_job_params,
package_id TYPE i,
offset TYPE i,
limit TYPE i,
total_count TYPE i,
END OF ty_job_params.
METHODS:
start_parallel_processing
IMPORTING iv_total_records TYPE i
iv_package_size TYPE i DEFAULT 1000
iv_max_parallelism TYPE i DEFAULT 4
RETURNING VALUE(rv_job_id) TYPE sysuuid_c32,
execute_package
IMPORTING is_params TYPE ty_job_params.
PRIVATE SECTION.
DATA ms_params TYPE ty_job_params.
ENDCLASS.
CLASS zcl_parallel_processor IMPLEMENTATION.
METHOD start_parallel_processing.
DATA: lt_jobs TYPE TABLE OF REF TO if_bgmc_op_single_tx_uncontr,
lv_offset TYPE i VALUE 0,
lv_package TYPE i VALUE 1.
" Pakete definieren und Jobs starten
WHILE lv_offset < iv_total_records.
" Job-Parameter
DATA(ls_params) = VALUE ty_job_params(
package_id = lv_package
offset = lv_offset
limit = iv_package_size
total_count = iv_total_records
).
" Neuen Processor für dieses Paket erstellen
DATA(lo_processor) = NEW zcl_parallel_processor( ).
lo_processor->ms_params = ls_params.
" Job registrieren
APPEND lo_processor TO lt_jobs.
lv_offset = lv_offset + iv_package_size.
lv_package = lv_package + 1.
" Parallelität begrenzen
IF lines( lt_jobs ) >= iv_max_parallelism.
" Batch starten und warten
start_job_batch( lt_jobs ).
CLEAR lt_jobs.
ENDIF.
ENDWHILE.
" Restliche Jobs starten
IF lt_jobs IS NOT INITIAL.
start_job_batch( lt_jobs ).
ENDIF.
ENDMETHOD.
METHOD if_bgmc_op_single_tx_uncontr~execute.
" Wird im Background Worker ausgeführt
execute_package( ms_params ).
ENDMETHOD.
METHOD execute_package.
" Paket aus DB laden
SELECT * FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_pending
ORDER BY customer_id
OFFSET @is_params-offset
UP TO @is_params-limit ROWS
INTO TABLE @DATA(lt_customers).
" Verarbeitung
LOOP AT lt_customers INTO DATA(ls_customer).
" Business Logic
process_single_customer( ls_customer ).
ENDLOOP.
" Commit innerhalb des Background Jobs
COMMIT WORK.
ENDMETHOD.
ENDCLASS.

bgPF Job starten

METHOD start_job_batch.
DATA(lo_factory) = cl_bgmc_process_factory=>get_default( ).
LOOP AT it_jobs INTO DATA(lo_job).
TRY.
" Operation registrieren
lo_factory->create( )->set(
iv_operation_type = 'MASS_PROCESS'
io_operation = lo_job
)->schedule( ).
CATCH cx_bgmc INTO DATA(lx_error).
" Fehlerbehandlung
log_error( lx_error->get_text( ) ).
ENDTRY.
ENDLOOP.
ENDMETHOD.

Memory-effiziente Verarbeitung

Streaming statt Bulk-Load

Anstatt alle Daten zu laden, können Sie Cursor-basiert verarbeiten:

METHOD process_with_cursor.
" Cursor öffnen
DATA: lt_buffer TYPE STANDARD TABLE OF zcustomer.
SELECT * FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_pending
INTO TABLE @lt_buffer
PACKAGE SIZE 500.
" Puffer verarbeiten
LOOP AT lt_buffer INTO DATA(ls_customer).
process_single_customer( ls_customer ).
ENDLOOP.
" Commit nach jedem Package
COMMIT WORK AND WAIT.
" Memory explizit freigeben
CLEAR lt_buffer.
ENDSELECT.
ENDMETHOD.

Memory Monitoring

METHOD check_memory_usage.
" Aktuellen Memory-Verbrauch prüfen
DATA: lv_used TYPE i,
lv_peak TYPE i,
lv_limit TYPE i.
CALL FUNCTION 'SYSTEM_GET_MEMORY_STATE'
IMPORTING
used_memory = lv_used
peak_memory = lv_peak.
" Warnung bei > 70% Auslastung
IF lv_used > lv_limit * 7 / 10.
log_warning( |Memory usage high: { lv_used } / { lv_limit }| ).
ENDIF.
" Bei kritischer Auslastung pausieren
IF lv_used > lv_limit * 9 / 10.
" Garbage Collection erzwingen
cl_abap_memory_utilities=>do_garbage_collection( ).
" Kurz warten
WAIT UP TO 1 SECONDS.
ENDIF.
ENDMETHOD.

Unnötige Daten vermeiden

" ❌ Schlecht: Alle Felder laden
SELECT * FROM zcustomer INTO TABLE @DATA(lt_all).
" ✓ Gut: Nur benötigte Felder
SELECT customer_id, customer_name, email
FROM zcustomer
INTO TABLE @DATA(lt_minimal).
" ✓ Besser: Aggregation in der Datenbank
SELECT country, COUNT(*) AS customer_count
FROM zcustomer
GROUP BY country
INTO TABLE @DATA(lt_summary).

Progress Tracking und Logging

Progress-Tabelle

" Fortschritt in Datenbanktabelle speichern
DEFINE TABLE zprogress_log {
key client : abap.clnt;
key job_id : sysuuid_c32;
key timestamp : timestampl;
total_count : i;
processed : i;
success : i;
errors : i;
status : abap.char(10);
message : abap.string(1000);
}

Progress Tracker Klasse

CLASS zcl_progress_tracker DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
METHODS:
constructor
IMPORTING iv_job_id TYPE sysuuid_c32
iv_total_count TYPE i,
increment
IMPORTING iv_success TYPE abap_bool DEFAULT abap_true
iv_message TYPE string OPTIONAL,
complete
IMPORTING iv_status TYPE string DEFAULT 'COMPLETED'
iv_message TYPE string OPTIONAL,
get_progress
RETURNING VALUE(rs_progress) TYPE zprogress_log.
PRIVATE SECTION.
DATA:
mv_job_id TYPE sysuuid_c32,
mv_total TYPE i,
mv_processed TYPE i,
mv_success TYPE i,
mv_errors TYPE i,
mv_last_update TYPE timestampl.
METHODS:
persist_progress.
ENDCLASS.
CLASS zcl_progress_tracker IMPLEMENTATION.
METHOD constructor.
mv_job_id = iv_job_id.
mv_total = iv_total_count.
" Initialen Eintrag erstellen
GET TIME STAMP FIELD mv_last_update.
INSERT INTO zprogress_log VALUES @(
VALUE #(
job_id = mv_job_id
timestamp = mv_last_update
total_count = mv_total
status = 'RUNNING'
)
).
COMMIT WORK.
ENDMETHOD.
METHOD increment.
mv_processed = mv_processed + 1.
IF iv_success = abap_true.
mv_success = mv_success + 1.
ELSE.
mv_errors = mv_errors + 1.
ENDIF.
" Nicht bei jedem Record updaten (Performance)
IF mv_processed MOD 100 = 0.
persist_progress( ).
ENDIF.
ENDMETHOD.
METHOD persist_progress.
GET TIME STAMP FIELD mv_last_update.
UPDATE zprogress_log SET
processed = @mv_processed,
success = @mv_success,
errors = @mv_errors,
timestamp = @mv_last_update
WHERE job_id = @mv_job_id.
COMMIT WORK.
ENDMETHOD.
METHOD complete.
GET TIME STAMP FIELD mv_last_update.
UPDATE zprogress_log SET
processed = @mv_processed,
success = @mv_success,
errors = @mv_errors,
status = @iv_status,
message = @iv_message,
timestamp = @mv_last_update
WHERE job_id = @mv_job_id.
COMMIT WORK.
ENDMETHOD.
METHOD get_progress.
SELECT SINGLE * FROM zprogress_log
WHERE job_id = @mv_job_id
INTO @rs_progress.
ENDMETHOD.
ENDCLASS.

Integration in Verarbeitung

METHOD process_with_tracking.
" Tracker initialisieren
DATA(lv_job_id) = cl_system_uuid=>create_uuid_c32_static( ).
DATA(lo_tracker) = NEW zcl_progress_tracker(
iv_job_id = lv_job_id
iv_total_count = lines( it_data )
).
" Verarbeitung
LOOP AT it_data INTO DATA(ls_record).
TRY.
process_record( ls_record ).
lo_tracker->increment( iv_success = abap_true ).
CATCH cx_root INTO DATA(lx_error).
lo_tracker->increment(
iv_success = abap_false
iv_message = lx_error->get_text( )
).
ENDTRY.
ENDLOOP.
" Abschluss
lo_tracker->complete(
iv_status = 'COMPLETED'
iv_message = |{ lo_tracker->get_progress( )-success } erfolgreich|
).
ENDMETHOD.

Fehlerbehandlung und Restart

Idempotente Verarbeitung

Idempotenz bedeutet: Die gleiche Operation kann mehrfach ausgeführt werden, ohne das Ergebnis zu verändern.

METHOD process_record_idempotent.
" Prüfen ob bereits verarbeitet
SELECT SINGLE processing_status FROM zcustomer
WHERE customer_id = @is_customer-customer_id
INTO @DATA(lv_status).
IF lv_status = zif_constants=>c_status_completed.
" Bereits verarbeitet - überspringen
RETURN.
ENDIF.
" Status auf "In Bearbeitung" setzen
UPDATE zcustomer SET
processing_status = @zif_constants=>c_status_processing,
processing_date = @sy-datum,
processing_time = @sy-uzeit
WHERE customer_id = @is_customer-customer_id.
TRY.
" Eigentliche Verarbeitung
execute_business_logic( is_customer ).
" Erfolg markieren
UPDATE zcustomer SET
processing_status = @zif_constants=>c_status_completed
WHERE customer_id = @is_customer-customer_id.
CATCH cx_root INTO DATA(lx_error).
" Fehler markieren für Retry
UPDATE zcustomer SET
processing_status = @zif_constants=>c_status_error,
error_message = @lx_error->get_text( )
WHERE customer_id = @is_customer-customer_id.
RAISE EXCEPTION lx_error.
ENDTRY.
ENDMETHOD.

Retry-Mechanismus

CLASS zcl_retry_processor DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
CONSTANTS:
c_max_retries TYPE i VALUE 3,
c_retry_delay_sec TYPE i VALUE 60.
METHODS:
process_with_retry
IMPORTING is_record TYPE zrecord
RAISING zcx_processing_failed.
PRIVATE SECTION.
METHODS:
should_retry
IMPORTING ix_error TYPE REF TO cx_root
RETURNING VALUE(rv_retry) TYPE abap_bool.
ENDCLASS.
CLASS zcl_retry_processor IMPLEMENTATION.
METHOD process_with_retry.
DATA: lv_attempts TYPE i VALUE 0.
WHILE lv_attempts < c_max_retries.
lv_attempts = lv_attempts + 1.
TRY.
" Verarbeitung versuchen
execute_processing( is_record ).
RETURN. " Erfolg!
CATCH cx_root INTO DATA(lx_error).
IF lv_attempts >= c_max_retries OR NOT should_retry( lx_error ).
" Endgültiger Fehler
RAISE EXCEPTION TYPE zcx_processing_failed
EXPORTING
previous = lx_error
record = is_record
attempts = lv_attempts.
ENDIF.
" Warten vor Retry (exponential backoff)
DATA(lv_wait) = c_retry_delay_sec * lv_attempts.
WAIT UP TO lv_wait SECONDS.
ENDTRY.
ENDWHILE.
ENDMETHOD.
METHOD should_retry.
" Nur bei temporären Fehlern retry
rv_retry = COND #(
WHEN ix_error IS INSTANCE OF cx_sy_open_sql_db THEN abap_true " DB-Fehler
WHEN ix_error IS INSTANCE OF cx_http_timeout THEN abap_true " Timeout
ELSE abap_false " Fachliche Fehler nicht wiederholen
).
ENDMETHOD.
ENDCLASS.

Restart-Fähigkeit

METHOD restart_failed_processing.
" Fehlgeschlagene Records ermitteln
SELECT customer_id FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_error
AND retry_count < @c_max_retries
INTO TABLE @DATA(lt_failed).
LOOP AT lt_failed INTO DATA(ls_failed).
" Retry-Counter erhöhen
UPDATE zcustomer SET
retry_count = retry_count + 1,
processing_status = @zif_constants=>c_status_pending
WHERE customer_id = @ls_failed-customer_id.
ENDLOOP.
COMMIT WORK.
" Normale Verarbeitung starten
process_pending_records( ).
ENDMETHOD.

Performance-Metriken und Monitoring

Metriken erfassen

CLASS zcl_processing_metrics DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
TYPES:
BEGIN OF ty_metrics,
job_id TYPE sysuuid_c32,
start_time TYPE timestampl,
end_time TYPE timestampl,
total_records TYPE i,
records_per_sec TYPE decfloat16,
avg_time_per_rec TYPE decfloat16,
memory_peak_mb TYPE i,
error_rate TYPE decfloat16,
END OF ty_metrics.
METHODS:
start_measurement,
record_processed,
stop_measurement
RETURNING VALUE(rs_metrics) TYPE ty_metrics.
PRIVATE SECTION.
DATA:
mv_start_time TYPE timestampl,
mv_record_count TYPE i,
mv_memory_peak TYPE i.
ENDCLASS.
CLASS zcl_processing_metrics IMPLEMENTATION.
METHOD start_measurement.
GET TIME STAMP FIELD mv_start_time.
mv_record_count = 0.
" Initiales Memory-Reading
CALL FUNCTION 'SYSTEM_GET_MEMORY_STATE'
IMPORTING peak_memory = mv_memory_peak.
ENDMETHOD.
METHOD record_processed.
mv_record_count = mv_record_count + 1.
" Memory-Peak aktualisieren
DATA lv_current_peak TYPE i.
CALL FUNCTION 'SYSTEM_GET_MEMORY_STATE'
IMPORTING peak_memory = lv_current_peak.
IF lv_current_peak > mv_memory_peak.
mv_memory_peak = lv_current_peak.
ENDIF.
ENDMETHOD.
METHOD stop_measurement.
DATA: lv_end_time TYPE timestampl,
lv_duration TYPE decfloat16.
GET TIME STAMP FIELD lv_end_time.
" Dauer in Sekunden
lv_duration = cl_abap_tstmp=>subtract(
tstmp1 = lv_end_time
tstmp2 = mv_start_time
).
rs_metrics = VALUE #(
start_time = mv_start_time
end_time = lv_end_time
total_records = mv_record_count
records_per_sec = COND #(
WHEN lv_duration > 0
THEN mv_record_count / lv_duration
ELSE 0
)
avg_time_per_rec = COND #(
WHEN mv_record_count > 0
THEN lv_duration / mv_record_count * 1000 " in ms
ELSE 0
)
memory_peak_mb = mv_memory_peak / 1024 / 1024
).
ENDMETHOD.
ENDCLASS.

Monitoring Dashboard (CDS View)

@EndUserText.label: 'Processing Job Status'
@Analytics.query: true
define view entity ZI_ProcessingStatus
as select from zprogress_log
{
key job_id,
timestamp,
total_count,
processed,
success,
errors,
@Semantics.amount.currencyCode: 'percent_currency'
cast( processed as abap.dec(5,2) ) / total_count * 100 as progress_percent,
cast( errors as abap.dec(5,2) ) / processed * 100 as error_rate,
status,
message,
@Semantics.systemDateTime.lastChangedAt: true
timestamp as last_update
}

Best Practices Zusammenfassung

Do’s

PraxisBegründung
Paketweise verarbeitenKontrollierter Memory-Verbrauch
COMMIT nach jedem PaketRestart-Fähigkeit
Progress trackenTransparenz für Benutzer
Idempotent implementierenSichere Wiederholung
Metriken erfassenPerformance-Optimierung
bgPF für ParallelisierungCloud-native Skalierung

Don’ts

Anti-PatternProblem
Alle Daten auf einmal ladenMemory Overflow
Kein COMMIT während LaufzeitLange Locks, kein Restart
Synchrone Verarbeitung grosser MengenTimeouts
Fehler ignorierenInkonsistente Daten
Ohne MonitoringBlindflug

Verwandte Themen

Massendaten-Verarbeitung in ABAP Cloud erfordert ein Umdenken gegenüber klassischen ABAP-Patterns. Mit Paketierung, Progress Tracking und dem Background Processing Framework lassen sich auch Millionen von Datensätzen zuverlässig und performant verarbeiten.