Traitement de données de masse - Best Practices dans ABAP Cloud

Catégorie
Best Practices
Publié
Auteur
Johannes

Le traitement de grandes quantités de données présente des défis particuliers dans ABAP Cloud. Les limites de mémoire, les limites de timeout et l’architecture Cloud nécessitent des stratégies bien pensées. Cet article présente des patterns éprouvés pour un traitement efficace des données de masse.

Défis avec les données de masse

Problèmes typiques

ProblèmeCauseImpact
Memory OverflowCharger toutes les données en une foisInterruption du programme
TimeoutDurée d’exécution trop longueHTTP 504 Gateway Timeout
Lock ContentionAccès parallèles aux mêmes donnéesChute de performance
IncohérenceInterruption pendant le traitementDonnées partiellement traitées
Manque de transparencePas de suivi de statutUtilisateurs dans le flou

Limites spécifiques au Cloud

Dans SAP BTP ABAP Environment, des limites de ressources strictes s’appliquent :

Work Process Timeout: 600 secondes (max)
Mémoire par Dialog WP: ~2 GB
Extended Memory: Limité par session
Connexions DB: Basé sur pool

Batch Processing avec packages

Principe de base : Diviser pour régner

Au lieu de traiter toutes les données en une fois, elles sont divisées en packages gérables (Chunks) :

CLASS zcl_mass_processor DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
CONSTANTS:
c_package_size TYPE i VALUE 1000.
METHODS:
process_all_customers
RETURNING VALUE(rs_result) TYPE ztt_processing_result.
PRIVATE SECTION.
METHODS:
get_customer_count
RETURNING VALUE(rv_count) TYPE i,
get_customer_package
IMPORTING iv_offset TYPE i
iv_limit TYPE i
RETURNING VALUE(rt_customers) TYPE ztt_customers,
process_customer_package
IMPORTING it_customers TYPE ztt_customers
RETURNING VALUE(rt_results) TYPE ztt_processing_result.
ENDCLASS.
CLASS zcl_mass_processor IMPLEMENTATION.
METHOD process_all_customers.
DATA: lv_offset TYPE i VALUE 0.
" Déterminer le nombre total
DATA(lv_total) = get_customer_count( ).
" Traiter par packages
WHILE lv_offset < lv_total.
" Charger le package
DATA(lt_customers) = get_customer_package(
iv_offset = lv_offset
iv_limit = c_package_size
).
" Traiter le package
DATA(lt_package_results) = process_customer_package( lt_customers ).
" Collecter les résultats
APPEND LINES OF lt_package_results TO rs_result.
" Après chaque package : COMMIT WORK
COMMIT WORK AND WAIT.
" Augmenter l'offset
lv_offset = lv_offset + c_package_size.
" Libérer la mémoire
CLEAR lt_customers.
ENDWHILE.
ENDMETHOD.
METHOD get_customer_count.
SELECT COUNT(*) FROM zcustomer INTO @rv_count
WHERE processing_status = @zif_constants=>c_status_pending.
ENDMETHOD.
METHOD get_customer_package.
SELECT * FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_pending
ORDER BY customer_id
OFFSET @iv_offset
UP TO @iv_limit ROWS
INTO TABLE @rt_customers.
ENDMETHOD.
METHOD process_customer_package.
LOOP AT it_customers INTO DATA(ls_customer).
TRY.
" Exécuter la logique métier
DATA(ls_result) = process_single_customer( ls_customer ).
APPEND ls_result TO rt_results.
CATCH cx_root INTO DATA(lx_error).
" Enregistrer l'erreur mais continuer
APPEND VALUE #(
customer_id = ls_customer-customer_id
status = 'ERROR"
message = lx_error->get_text( )
) TO rt_results.
ENDTRY.
ENDLOOP.
ENDMETHOD.
ENDCLASS.

Taille de package optimale

La taille idéale du package dépend de plusieurs facteurs :

FacteurPetits packages (100-500)Grands packages (1000-5000)
MémoireMoins de consommationPlus de consommation
OverheadPlus de roundtrips DBMoins d’overhead
RestartMoins de perte de donnéesPlus de perte de données
ParallélisationMieux distribuableMoins distribuable

Recommandation : Commencez avec 1000 enregistrements et ajustez selon les mesures.

Parallélisation avec BGPF

Pour une performance maximale, les packages peuvent être traités en parallèle. Le Background Processing Framework (bgPF) offre une solution élégante pour cela.

Architecture de traitement parallèle

┌─────────────────────────────────────────────────────┐
│ Main Process │
│ ┌─────────────────────────────────────────────────┐│
│ │ 1. Diviser les données en packages ││
│ │ 2. Démarrer des jobs bgPF pour chaque package ││
│ │ 3. Attendre la completion ││
│ │ 4. Agréger les résultats ││
│ └─────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│Worker 1│ │Worker 2│ │Worker 3│ │Worker 4│
│Pack 1 │ │Pack 2 │ │Pack 3 │ │Pack 4 │
└────────┘ └────────┘ └────────┘ └────────┘

Implémentation avec bgPF

CLASS zcl_parallel_processor DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
INTERFACES if_bgmc_op_single_tx_uncontr.
TYPES:
BEGIN OF ty_job_params,
package_id TYPE i,
offset TYPE i,
limit TYPE i,
total_count TYPE i,
END OF ty_job_params.
METHODS:
start_parallel_processing
IMPORTING iv_total_records TYPE i
iv_package_size TYPE i DEFAULT 1000
iv_max_parallelism TYPE i DEFAULT 4
RETURNING VALUE(rv_job_id) TYPE sysuuid_c32,
execute_package
IMPORTING is_params TYPE ty_job_params.
PRIVATE SECTION.
DATA ms_params TYPE ty_job_params.
ENDCLASS.
CLASS zcl_parallel_processor IMPLEMENTATION.
METHOD start_parallel_processing.
DATA: lt_jobs TYPE TABLE OF REF TO if_bgmc_op_single_tx_uncontr,
lv_offset TYPE i VALUE 0,
lv_package TYPE i VALUE 1.
" Définir les packages et démarrer les jobs
WHILE lv_offset < iv_total_records.
" Paramètres du job
DATA(ls_params) = VALUE ty_job_params(
package_id = lv_package
offset = lv_offset
limit = iv_package_size
total_count = iv_total_records
).
" Créer un nouveau processeur pour ce package
DATA(lo_processor) = NEW zcl_parallel_processor( ).
lo_processor->ms_params = ls_params.
" Enregistrer le job
APPEND lo_processor TO lt_jobs.
lv_offset = lv_offset + iv_package_size.
lv_package = lv_package + 1.
" Limiter le parallélisme
IF lines( lt_jobs ) >= iv_max_parallelism.
" Démarrer le batch et attendre
start_job_batch( lt_jobs ).
CLEAR lt_jobs.
ENDIF.
ENDWHILE.
" Démarrer les jobs restants
IF lt_jobs IS NOT INITIAL.
start_job_batch( lt_jobs ).
ENDIF.
ENDMETHOD.
METHOD if_bgmc_op_single_tx_uncontr~execute.
" Exécuté dans le Background Worker
execute_package( ms_params ).
ENDMETHOD.
METHOD execute_package.
" Charger le package depuis la DB
SELECT * FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_pending
ORDER BY customer_id
OFFSET @is_params-offset
UP TO @is_params-limit ROWS
INTO TABLE @DATA(lt_customers).
" Traitement
LOOP AT lt_customers INTO DATA(ls_customer).
" Logique métier
process_single_customer( ls_customer ).
ENDLOOP.
" Commit au sein du Background Job
COMMIT WORK.
ENDMETHOD.
ENDCLASS.

Démarrer un job bgPF

METHOD start_job_batch.
DATA(lo_factory) = cl_bgmc_process_factory=>get_default( ).
LOOP AT it_jobs INTO DATA(lo_job).
TRY.
" Enregistrer l'opération
lo_factory->create( )->set(
iv_operation_type = 'MASS_PROCESS"
io_operation = lo_job
)->schedule( ).
CATCH cx_bgmc INTO DATA(lx_error).
" Gestion des erreurs
log_error( lx_error->get_text( ) ).
ENDTRY.
ENDLOOP.
ENDMETHOD.

Traitement économe en mémoire

Streaming au lieu de Bulk-Load

Au lieu de charger toutes les données, vous pouvez traiter avec un curseur :

METHOD process_with_cursor.
" Ouvrir le curseur
DATA: lt_buffer TYPE STANDARD TABLE OF zcustomer.
SELECT * FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_pending
INTO TABLE @lt_buffer
PACKAGE SIZE 500.
" Traiter le buffer
LOOP AT lt_buffer INTO DATA(ls_customer).
process_single_customer( ls_customer ).
ENDLOOP.
" Commit après chaque package
COMMIT WORK AND WAIT.
" Libérer explicitement la mémoire
CLEAR lt_buffer.
ENDSELECT.
ENDMETHOD.

Surveillance de la mémoire

METHOD check_memory_usage.
" Vérifier la consommation mémoire actuelle
DATA: lv_used TYPE i,
lv_peak TYPE i,
lv_limit TYPE i.
CALL FUNCTION 'SYSTEM_GET_MEMORY_STATE"
IMPORTING
used_memory = lv_used
peak_memory = lv_peak.
" Avertissement à > 70% d'utilisation
IF lv_used > lv_limit * 7 / 10.
log_warning( |Utilisation mémoire élevée : { lv_used } / { lv_limit }| ).
ENDIF.
" En cas d'utilisation critique, faire une pause
IF lv_used > lv_limit * 9 / 10.
" Forcer le Garbage Collection
cl_abap_memory_utilities=>do_garbage_collection( ).
" Attendre brièvement
WAIT UP TO 1 SECONDS.
ENDIF.
ENDMETHOD.

Éviter les données inutiles

" ❌ Mauvais : Charger tous les champs
SELECT * FROM zcustomer INTO TABLE @DATA(lt_all).
" ✓ Bon : Uniquement les champs nécessaires
SELECT customer_id, customer_name, email
FROM zcustomer
INTO TABLE @DATA(lt_minimal).
" ✓ Meilleur : Agrégation dans la base de données
SELECT country, COUNT(*) AS customer_count
FROM zcustomer
GROUP BY country
INTO TABLE @DATA(lt_summary).

Suivi de progression et logging

Table de progression

" Sauvegarder la progression dans une table de base de données
DEFINE TABLE zprogress_log {
key client : abap.clnt;
key job_id : sysuuid_c32;
key timestamp : timestampl;
total_count : i;
processed : i;
success : i;
errors : i;
status : abap.char(10);
message : abap.string(1000);
}

Classe Progress Tracker

CLASS zcl_progress_tracker DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
METHODS:
constructor
IMPORTING iv_job_id TYPE sysuuid_c32
iv_total_count TYPE i,
increment
IMPORTING iv_success TYPE abap_bool DEFAULT abap_true
iv_message TYPE string OPTIONAL,
complete
IMPORTING iv_status TYPE string DEFAULT 'COMPLETED"
iv_message TYPE string OPTIONAL,
get_progress
RETURNING VALUE(rs_progress) TYPE zprogress_log.
PRIVATE SECTION.
DATA:
mv_job_id TYPE sysuuid_c32,
mv_total TYPE i,
mv_processed TYPE i,
mv_success TYPE i,
mv_errors TYPE i,
mv_last_update TYPE timestampl.
METHODS:
persist_progress.
ENDCLASS.
CLASS zcl_progress_tracker IMPLEMENTATION.
METHOD constructor.
mv_job_id = iv_job_id.
mv_total = iv_total_count.
" Créer une entrée initiale
GET TIME STAMP FIELD mv_last_update.
INSERT INTO zprogress_log VALUES @(
VALUE #(
job_id = mv_job_id
timestamp = mv_last_update
total_count = mv_total
status = 'RUNNING"
)
).
COMMIT WORK.
ENDMETHOD.
METHOD increment.
mv_processed = mv_processed + 1.
IF iv_success = abap_true.
mv_success = mv_success + 1.
ELSE.
mv_errors = mv_errors + 1.
ENDIF.
" Ne pas mettre à jour à chaque enregistrement (performance)
IF mv_processed MOD 100 = 0.
persist_progress( ).
ENDIF.
ENDMETHOD.
METHOD persist_progress.
GET TIME STAMP FIELD mv_last_update.
UPDATE zprogress_log SET
processed = @mv_processed,
success = @mv_success,
errors = @mv_errors,
timestamp = @mv_last_update
WHERE job_id = @mv_job_id.
COMMIT WORK.
ENDMETHOD.
METHOD complete.
GET TIME STAMP FIELD mv_last_update.
UPDATE zprogress_log SET
processed = @mv_processed,
success = @mv_success,
errors = @mv_errors,
status = @iv_status,
message = @iv_message,
timestamp = @mv_last_update
WHERE job_id = @mv_job_id.
COMMIT WORK.
ENDMETHOD.
METHOD get_progress.
SELECT SINGLE * FROM zprogress_log
WHERE job_id = @mv_job_id
INTO @rs_progress.
ENDMETHOD.
ENDCLASS.

Intégration dans le traitement

METHOD process_with_tracking.
" Initialiser le tracker
DATA(lv_job_id) = cl_system_uuid=>create_uuid_c32_static( ).
DATA(lo_tracker) = NEW zcl_progress_tracker(
iv_job_id = lv_job_id
iv_total_count = lines( it_data )
).
" Traitement
LOOP AT it_data INTO DATA(ls_record).
TRY.
process_record( ls_record ).
lo_tracker->increment( iv_success = abap_true ).
CATCH cx_root INTO DATA(lx_error).
lo_tracker->increment(
iv_success = abap_false
iv_message = lx_error->get_text( )
).
ENDTRY.
ENDLOOP.
" Finalisation
lo_tracker->complete(
iv_status = 'COMPLETED"
iv_message = |{ lo_tracker->get_progress( )-success } avec succès|
).
ENDMETHOD.

Gestion des erreurs et restart

Traitement idempotent

L’idempotence signifie : La même opération peut être exécutée plusieurs fois sans changer le résultat.

METHOD process_record_idempotent.
" Vérifier si déjà traité
SELECT SINGLE processing_status FROM zcustomer
WHERE customer_id = @is_customer-customer_id
INTO @DATA(lv_status).
IF lv_status = zif_constants=>c_status_completed.
" Déjà traité - ignorer
RETURN.
ENDIF.
" Statut sur "En cours"
UPDATE zcustomer SET
processing_status = @zif_constants=>c_status_processing,
processing_date = @sy-datum,
processing_time = @sy-uzeit
WHERE customer_id = @is_customer-customer_id.
TRY.
" Traitement proprement dit
execute_business_logic( is_customer ).
" Marquer comme succès
UPDATE zcustomer SET
processing_status = @zif_constants=>c_status_completed
WHERE customer_id = @is_customer-customer_id.
CATCH cx_root INTO DATA(lx_error).
" Marquer comme erreur pour retry
UPDATE zcustomer SET
processing_status = @zif_constants=>c_status_error,
error_message = @lx_error->get_text( )
WHERE customer_id = @is_customer-customer_id.
RAISE EXCEPTION lx_error.
ENDTRY.
ENDMETHOD.

Mécanisme de retry

CLASS zcl_retry_processor DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
CONSTANTS:
c_max_retries TYPE i VALUE 3,
c_retry_delay_sec TYPE i VALUE 60.
METHODS:
process_with_retry
IMPORTING is_record TYPE zrecord
RAISING zcx_processing_failed.
PRIVATE SECTION.
METHODS:
should_retry
IMPORTING ix_error TYPE REF TO cx_root
RETURNING VALUE(rv_retry) TYPE abap_bool.
ENDCLASS.
CLASS zcl_retry_processor IMPLEMENTATION.
METHOD process_with_retry.
DATA: lv_attempts TYPE i VALUE 0.
WHILE lv_attempts < c_max_retries.
lv_attempts = lv_attempts + 1.
TRY.
" Tenter le traitement
execute_processing( is_record ).
RETURN. " Succès !
CATCH cx_root INTO DATA(lx_error).
IF lv_attempts >= c_max_retries OR NOT should_retry( lx_error ).
" Erreur définitive
RAISE EXCEPTION TYPE zcx_processing_failed
EXPORTING
previous = lx_error
record = is_record
attempts = lv_attempts.
ENDIF.
" Attendre avant retry (exponential backoff)
DATA(lv_wait) = c_retry_delay_sec * lv_attempts.
WAIT UP TO lv_wait SECONDS.
ENDTRY.
ENDWHILE.
ENDMETHOD.
METHOD should_retry.
" Retry uniquement pour les erreurs temporaires
rv_retry = COND #(
WHEN ix_error IS INSTANCE OF cx_sy_open_sql_db THEN abap_true " Erreur DB
WHEN ix_error IS INSTANCE OF cx_http_timeout THEN abap_true " Timeout
ELSE abap_false " Ne pas répéter les erreurs fonctionnelles
).
ENDMETHOD.
ENDCLASS.

Capacité de restart

METHOD restart_failed_processing.
" Déterminer les enregistrements en échec
SELECT customer_id FROM zcustomer
WHERE processing_status = @zif_constants=>c_status_error
AND retry_count < @c_max_retries
INTO TABLE @DATA(lt_failed).
LOOP AT lt_failed INTO DATA(ls_failed).
" Augmenter le compteur de retry
UPDATE zcustomer SET
retry_count = retry_count + 1,
processing_status = @zif_constants=>c_status_pending
WHERE customer_id = @ls_failed-customer_id.
ENDLOOP.
COMMIT WORK.
" Démarrer le traitement normal
process_pending_records( ).
ENDMETHOD.

Métriques de performance et monitoring

Capturer les métriques

CLASS zcl_processing_metrics DEFINITION
PUBLIC FINAL CREATE PUBLIC.
PUBLIC SECTION.
TYPES:
BEGIN OF ty_metrics,
job_id TYPE sysuuid_c32,
start_time TYPE timestampl,
end_time TYPE timestampl,
total_records TYPE i,
records_per_sec TYPE decfloat16,
avg_time_per_rec TYPE decfloat16,
memory_peak_mb TYPE i,
error_rate TYPE decfloat16,
END OF ty_metrics.
METHODS:
start_measurement,
record_processed,
stop_measurement
RETURNING VALUE(rs_metrics) TYPE ty_metrics.
PRIVATE SECTION.
DATA:
mv_start_time TYPE timestampl,
mv_record_count TYPE i,
mv_memory_peak TYPE i.
ENDCLASS.
CLASS zcl_processing_metrics IMPLEMENTATION.
METHOD start_measurement.
GET TIME STAMP FIELD mv_start_time.
mv_record_count = 0.
" Lecture mémoire initiale
CALL FUNCTION 'SYSTEM_GET_MEMORY_STATE"
IMPORTING peak_memory = mv_memory_peak.
ENDMETHOD.
METHOD record_processed.
mv_record_count = mv_record_count + 1.
" Actualiser le pic mémoire
DATA lv_current_peak TYPE i.
CALL FUNCTION 'SYSTEM_GET_MEMORY_STATE"
IMPORTING peak_memory = lv_current_peak.
IF lv_current_peak > mv_memory_peak.
mv_memory_peak = lv_current_peak.
ENDIF.
ENDMETHOD.
METHOD stop_measurement.
DATA: lv_end_time TYPE timestampl,
lv_duration TYPE decfloat16.
GET TIME STAMP FIELD lv_end_time.
" Durée en secondes
lv_duration = cl_abap_tstmp=>subtract(
tstmp1 = lv_end_time
tstmp2 = mv_start_time
).
rs_metrics = VALUE #(
start_time = mv_start_time
end_time = lv_end_time
total_records = mv_record_count
records_per_sec = COND #(
WHEN lv_duration > 0
THEN mv_record_count / lv_duration
ELSE 0
)
avg_time_per_rec = COND #(
WHEN mv_record_count > 0
THEN lv_duration / mv_record_count * 1000 " en ms
ELSE 0
)
memory_peak_mb = mv_memory_peak / 1024 / 1024
).
ENDMETHOD.
ENDCLASS.

Tableau de bord de monitoring (CDS View)

@EndUserText.label: 'Statut des jobs de traitement"
@Analytics.query: true
define view entity ZI_ProcessingStatus
as select from zprogress_log
{
key job_id,
timestamp,
total_count,
processed,
success,
errors,
@Semantics.amount.currencyCode: 'percent_currency"
cast( processed as abap.dec(5,2) ) / total_count * 100 as progress_percent,
cast( errors as abap.dec(5,2) ) / processed * 100 as error_rate,
status,
message,
@Semantics.systemDateTime.lastChangedAt: true
timestamp as last_update
}

Résumé des Best Practices

À faire

PratiqueJustification
Traiter par packagesConsommation mémoire contrôlée
COMMIT après chaque packageCapacité de restart
Suivre la progressionTransparence pour l’utilisateur
Implémenter idempotentRépétition sûre
Capturer les métriquesOptimisation de performance
bgPF pour parallélisationScaling Cloud-native

À ne pas faire

Anti-PatternProblème
Charger toutes les données en une foisMemory Overflow
Pas de COMMIT pendant l’exécutionVerrous longs, pas de restart
Traitement synchrone de grandes quantitésTimeouts
Ignorer les erreursDonnées incohérentes
Sans monitoringNavigation à l’aveugle

Sujets connexes

Le traitement de données de masse dans ABAP Cloud nécessite une nouvelle façon de penser par rapport aux patterns ABAP classiques. Avec le packaging, le suivi de progression et le Background Processing Framework, même des millions d’enregistrements peuvent être traités de manière fiable et performante.