insert haute fréquence dans cassandra avec java perd des données

insert haute fréquence dans cassandra avec java perd des données

J'ai 5 000 000 de requêtes d'insertion dans le fichier. Je veux les lire à partir du fichier et écrire sur Cassandra avec le pilote Java et la méthode executeAsync, dans une instruction en boucle comme le code suivant :

public static void main(String[] args) {
        FileReader fr = null;
        try {
            fr = new FileReader("the-file-name.txt");
            BufferedReader br = new BufferedReader(fr);
            String sCurrentLine;
            long time1 = System.currentTimeMillis();
            while ((sCurrentLine = br.readLine()) != null) {
                session.executeAsync(sCurrentLine);
            }

            System.out.println(System.currentTimeMillis() - time1);
            fr.close();
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    } 

ma définition de table est :

CREATE TABLE test.climate (
    city text,
    date text,
    time text,
    temprature int,
    PRIMARY KEY ((city, date), time)
) WITH CLUSTERING ORDER BY (time ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

Mais après l'exécution du programme, le nombre de lignes dans le tableau est de 2 569 725

cqlsh:test> select count(*) from climate ;

 count
---------
 2569725

J'ai testé plus de 10 fois et à chaque fois le résultat de select count(*) était compris entre 2 400 00 et 2 600 000

Montrez la meilleure réponse

Avez-vous oublié de fermer la session et le cluster ?

Montrez-nous un exemple d'instruction d'insertion ? Souvent, les gens pensent qu'ils insèrent 5 millions de lignes, mais la clé de partition reste la même sur plusieurs lignes et seules les colonnes de clustering changent. Par exemple ici, vous pourriez avoir (ville, date) identique pour différentes valeurs de temps qui sont valides mais comptées comme une seule ligne. Aussi dans cqlsh la cohérence par défaut est ONE, changez-la en quorum local

Ajoutez une journalisation pour votre code pour savoir ce qui se passe : Futures.addCallback( session.executeAsync( sCurrentLine ), new FutureCallback<ResultSet>() { @Override public void onSuccess( ResultSet result ) { //ignore } @Override public void onFailure( Throwable t ) { t.printStackTrace(); } } ); Je suppose que vous verrez des exceptions concernant les délais d'attente ou les nœuds indisponibles

l'exception est : Tous les hôtes essayés pour la requête ont échoué (essayés : /127.0.0.1:9042 (com.datastax.driver.core.exceptions.BusyPoolException : [/127.0.0.1] Le pool est occupé (aucune connexion disponible et la file d'attente a atteint sa taille maximale 256))) @MikhailBaksheev

Merci @MikhailBaksheev. C'est du travail.

Vous émettez des insertions asynchrones plus rapidement qu'elles ne s'exécutent, de sorte qu'elles finissent par dépasser la taille de la file d'attente et échouent. Vous pouvez augmenter la taille de votre file d'attente, ce qui fonctionnerait, mais vous appliquez simplement une contre-pression à la mémoire au lieu de votre producteur et vous risquez toujours de vous heurter à un mur. Essayez de limiter les requêtes en cours comme :

public static void main2(String[] args) {
    FileReader fr = null;
    int permits = 256;
    Semaphore l = new Semaphore(permits);
    try {
        fr = new FileReader("the-file-name.txt");
        BufferedReader br = new BufferedReader(fr);
        String sCurrentLine;
        long time1 = System.currentTimeMillis();
        while ((sCurrentLine = br.readLine()) != null) {
            l.acquire();
            session.executeAsync(sCurrentLine)
                .addListener(()->l.release(), MoreExecutors.directExecutor());
        }
        l.acquire(permits);

        System.out.println(System.currentTimeMillis() - time1);
        fr.close();
        br.close();
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Il fonctionnera probablement aussi vite, il suffit de trouver la bonne taille de sémaphore. Notez également le blocage jusqu'à ce que tous les permis aient été renvoyés (acquisition de max à la fin), sinon vous pouvez arrêter jvm avant que toutes les requêtes éventuellement en file d'attente aient été envoyées.

avertissement : je n'ai pas testé le code ci-dessus