la inserción de alta frecuencia en cassandra con java pierde algunos d

la inserción de alta frecuencia en cassandra con java pierde algunos datos

Tengo 5 000 000 de consultas de inserción en el archivo. Quiero leerlos desde el archivo y escribir en Cassandra con el controlador Java y el método executeAsync, en una declaración de bucle como el siguiente código:

public static void main(String[] args) {
        FileReader fr = null;
        try {
            fr = new FileReader("the-file-name.txt");
            BufferedReader br = new BufferedReader(fr);
            String sCurrentLine;
            long time1 = System.currentTimeMillis();
            while ((sCurrentLine = br.readLine()) != null) {
                session.executeAsync(sCurrentLine);
            }

            System.out.println(System.currentTimeMillis() - time1);
            fr.close();
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    } 

la definición de mi tabla es:

CREATE TABLE test.climate (
    city text,
    date text,
    time text,
    temprature int,
    PRIMARY KEY ((city, date), time)
) WITH CLUSTERING ORDER BY (time ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

Pero después de ejecutar el programa, el recuento de filas en la tabla es 2 569 725

cqlsh:test> select count(*) from climate ;

 count
---------
 2569725

Probé más de 10 veces y cada vez el resultado del conteo seleccionado (*) fue entre 2,400,00 y 2,600,000

Mostrar la mejor respuesta

¿Olvidaste cerrar la sesión y el clúster?

¿Mostrarnos una declaración de inserción de muestra? Muchas veces las personas piensan que insertan filas de 5M, pero la clave de partición sigue siendo la misma en varias filas y solo cambia la columna de agrupación. Por ejemplo, aquí puede tener (ciudad, fecha) lo mismo para diferentes valores de tiempo, lo cual es válido pero se cuenta como una sola fila. También en cqlsh la consistencia predeterminada es UNO, cámbielo a quórum local

Agregue el registro de su código para saber qué sucede: Futures.addCallback( session.executeAsync( sCurrentLine ), new FutureCallback<ResultSet>() { @Override public void onSuccess( ResultSet result ) { //ignore } @Override public void onFailure( Throwable t ) { t.printStackTrace(); } } ); Supongo que verá excepciones sobre tiempos de espera o nodos no disponibles

la excepción es: todos los hosts que se intentaron para la consulta fallaron (intentados: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] El grupo está ocupado (no hay conexión disponible y la cola ha alcanzado su tamaño máximo 256))) @MikhailBaksheev

Gracias @MikhailBaksheev. Es trabajo.

Está emitiendo inserciones asíncronas más rápido de lo que se ejecutan, por lo que eventualmente superan el tamaño de la cola y fallan. Puede aumentar el tamaño de la cola, lo que funcionaría, pero luego solo está aplicando presión de retroceso a la memoria en lugar de a su productor y aún posiblemente chocando contra una pared. Intente limitar las consultas en curso como:

public static void main2(String[] args) {
    FileReader fr = null;
    int permits = 256;
    Semaphore l = new Semaphore(permits);
    try {
        fr = new FileReader("the-file-name.txt");
        BufferedReader br = new BufferedReader(fr);
        String sCurrentLine;
        long time1 = System.currentTimeMillis();
        while ((sCurrentLine = br.readLine()) != null) {
            l.acquire();
            session.executeAsync(sCurrentLine)
                .addListener(()->l.release(), MoreExecutors.directExecutor());
        }
        l.acquire(permits);

        System.out.println(System.currentTimeMillis() - time1);
        fr.close();
        br.close();
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

Es probable que se ejecute igual de rápido, solo necesita encontrar el tamaño correcto del semáforo. También tenga en cuenta el bloqueo hasta que se hayan devuelto todos los permisos (adquiriendo el máximo al final); de lo contrario, puede cerrar jvm antes de que se envíen todas las solicitudes que posiblemente estén en cola.

descargo de responsabilidad: no probé el código anterior