Paging
One way to work around this is to use paging: first select the first million rows, then select the next million, etc... until all rows are queried. Your first query will look something like the following:SELECT * FROM myTable OFFSET 0 ROWS FETCH NEXT 1000000 ROWS ONLY;
while the next one will look something like the following:
SELECT * FROM myTable OFFSET 1000000 ROWS FETCH NEXT 2000000 ROWS ONLY;
This is slow for 2 reasons:
- you need to make sure all subsequent calls use a consistent ordering. As a result you have to add an order by clause to your query where you order by your primary key. you could also use something like rowId in Oracle or something similar in other databases.
- paging basically runs the entire query up till the page you requested. The higher the page you request, the longer your query will take.
Observables
Another way to avoid blowing up your heap size is to return rxJava's Observable instead of a list with all your results. This way you're streaming the results from your query to the subscriber of your Observable but you never keep all results in memory. This has two advantages:- you don't need a lot of memory to keep all the results in memory
- you don't have to wait until all results from the database are fetched. You can already start processing items the moment they are read from the database and passed to the subscriber.
I'm using spring-jdbc's JdbcTemplate or NamedParameterJdbcTemplate for querying the database. It has all sorts of methods for querying the database. The following code shows you how to return an Observable:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T> Observable<T> query(String sql, RowMapper<? extends T> rowMapper) {
return Observable.create(subscriber ->
this.jdbcTemplate.query(sql, extractResultToSubscriber(subscriber, rowMapper))
);
}
It takes a sql query and a RowMapper for converting the result to some type <T> and returns an Observable. The observable will trigger the call to the database for every subscription using a ResultSetExtractor. The ResultSetExtractor will iterate over the ResultSet and will pass the data for each row to the subscriber until there are no more rows.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static <T> ResultSetExtractor<Void> extractResultToSubscriber(Subscriber<T> subscriber, RowMapper<? extends T> rowMapper) {
return rs -> {
int rowNum = 0;
while (rs.next() && !subscriber.isUnsubscribed()) {
if (rowNum % 10000 == 0) log.debug("Streaming result #{} using {}", rowNum, rowMapper);
subscriber.onNext(rowMapper.mapRow(rs, rowNum++));
}
log.debug("Ended streaming {} results using {}", rowNum, rowMapper);
subscriber.onCompleted();
return null;
};
}
I can call the observable, subscribe to it and handle each row:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def "subscribing to an observable without backpressure succeeds using callstack blocking"() {
given:
def jdbcTemplate = new ObservableJdbcTemplateWithoutBackpressure(dataSource)
def subscriber = new TestSubscriber(slowObserver)
when:
jdbcTemplate.query(query, new ColumnMapRowMapper())
// .observeOn(Schedulers.io())
.subscribe(subscriber)
subscriber.awaitTerminalEvent()
then:
subscriber.assertNoErrors()
}
This works great... until you start doing things on another thread and your subscriber can't keep up with the items that are being produced:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def "when subscribing to an observable without backpressure all is fine as long as the consumer is faster than the producer"() {
given:
def jdbcTemplate = new ObservableJdbcTemplateWithoutBackpressure(dataSource)
def subscriber = new TestSubscriber(slowObserver)
when:
jdbcTemplate.query(query, new ColumnMapRowMapper())
.observeOn(Schedulers.io())
.subscribe(subscriber)
subscriber.awaitTerminalEvent()
then:
subscriber.assertError(MissingBackpressureException)
}
The result is a MissingBackpressureException that is thrown when the consumer can't keep up.
The problem is we didn't take backpressure into account. This is needed the moment you're not doing everything in 1 thread (callstack blocking) and have a consumer that is slower than your producer (the results from the db query).
Backpressure
The solution is to refactor the above code into something that does know how to handle backpressure. Luckily rxJava helps us a lot by abstracting away most of the difficulties using SyncOnSubscribe. The code is a bit more complicated because we let rxJava control the call. In return it needs to know how to keep state during the subscription so we need to tell it ...
- how to create a new state object
- what to do when a new item is requested
- how to clean up state
To start, we need to wrap the Connection, the DataSource, the PreparedStatement and the ResultSet in a state object called JdbcQueryState:
Now, when a new item is requested, we get the next row from the ResultSet and pass it to the observer or signal the observer that it's complete when there are no rows left:
Finally we need to cleanup all state when it's done:
All this is called from methods that take the sql and return a backpressure aware Observable:
To hook into Spring's statement handling I needed to extends from NamedParameterJdbcTemplate. You can see the full code here.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private class JdbcQueryState {
public final ResultSet rs;
private final Connection connection;
private final PreparedStatementCreator psc;
private final PreparedStatement ps;
private final DataSource dataSource;
private volatile int rowNum = 0;
public JdbcQueryState(ResultSet rs, Connection connection, PreparedStatementCreator psc, PreparedStatement ps, DataSource dataSource) {
this.rs = rs;
this.connection = connection;
this.psc = psc;
this.ps = ps;
this.dataSource = dataSource;
}
}
Now, when a new item is requested, we get the next row from the ResultSet and pass it to the observer or signal the observer that it's complete when there are no rows left:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T> void handleNext(Observer<? super T> observer, RowMapper<T> rowMapper) {
try {
if (rs.next()) {
if (rowNum % 10000 == 0) log.debug("Streaming result #{} using {}", rowNum, rowMapper);
observer.onNext(rowMapper.mapRow(rs, rowNum++));
} else {
log.debug("Ended streaming {} results using {}", rowNum, rowMapper);
observer.onCompleted();
}
} catch (SQLException e) {
observer.onError(e);
}
}
Finally we need to cleanup all state when it's done:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void cleanup() {
JdbcUtils.closeResultSet(rs);
cleanupParameters(psc);
cleanupParameters(ps);
JdbcUtils.closeStatement(ps);
DataSourceUtils.releaseConnection(connection, dataSource);
}
All this is called from methods that take the sql and return a backpressure aware Observable:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T> Observable<T> observableQuery(String sql, Map<String, ?> paramMap, RowMapper<T> rowMapper) {
return Observable.create(SyncOnSubscribe.createSingleState(
() -> createJdbcQueryState(sql, paramMap)
, (state, observer) -> state.handleNext(observer, rowMapper)
, JdbcQueryState::cleanup
));
}
public <T> Observable<T> observableQuery(String sql, RowMapper<T> rowMapper) {
return Observable.create(SyncOnSubscribe.createSingleState(
() -> createJdbcQueryState(sql)
, (state, observer) -> state.handleNext(observer, rowMapper)
, JdbcQueryState::cleanup
));
}
To hook into Spring's statement handling I needed to extends from NamedParameterJdbcTemplate. You can see the full code here.
Greetings
Jan
Jan
No comments:
Post a Comment