Kohei Nozaki's blog 

JBatch examples: bulk loading from CSV file to database


Posted on Sunday May 24, 2015 at 03:07PM in JBatch


Bulk loading is a typical usecase of batch application. in this entry, I give you a example of bulk loading from a CSV file.

There is a supplemental package named jberet-support, which contains many useful classes that implemented ItemReader or ItemWriter for common usecases. in this entry, we’ll use CsvItemReader to read a CSV file and JdbcItemWriter to write data to database.

Setup

In this setup we’ll use WildFly 9.0.0.CR1.

First, we need a CSV file, whatever. so we’ll use a forex historical data which can be downloaded from http://www.histdata.com/download-free-forex-historical-data/?/ascii/1-minute-bar-quotes/usdjpy/2015/4 . download it and unpack, put DAT_ASCII_USDJPY_M1_201504.csv somewhere in your environment. this file contains data like:

20150401 000000;119.566000;119.566000;119.551000;119.565000;0
20150401 000100;119.566000;119.581000;119.565000;119.579000;0
20150401 000200;119.581000;119.586000;119.581000;119.583000;0
...

Next, define a JTA datasource on WildFly. following is an example command which defines a H2 datasource using jboss-cli:

data-source add \
 --name=MyDS \
 --driver-name=h2 \
 --jndi-name=java:jboss/datasources/MyDS \
 --user-name=sa \
 --password=sa \
 --connection-url=jdbc:h2:/tmp/myds;AUTO_SERVER=TRUE

After confirmed outcome was success, issue following command to test a connection:

/subsystem=datasources/data-source=MyDS:test-connection-in-pool

Next, create a table to store dataset. issue following command to start H2 console, in the base directory of your WildFly instance:

java -cp ./modules/system/layers/base/com/h2database/h2/main/h2*.jar org.h2.tools.Shell -url "jdbc:h2:/tmp/myds;AUTO_SERVER=TRUE" -user sa -password sa

Execute following DDL:

CREATE TABLE forex (
    symbol VARCHAR(6) NOT NULL,
    ts TIMESTAMP NOT NULL,
    bid_open NUMERIC(10,3) NOT NULL,
    bid_high NUMERIC(10,3) NOT NULL,
    bid_low NUMERIC(10,3) NOT NULL,
    bid_close NUMERIC(10,3) NOT NULL,
    volume INTEGER NOT NULL,
    PRIMARY KEY(symbol, ts)
);

Next, create a batch application.

pom.xml

You need following dependencies in your pom.xml:

<dependencies>
    <dependency>
        <groupId>javax</groupId>
        <artifactId>javaee-api</artifactId>
        <version>7.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.batchee</groupId>
        <artifactId>batchee-servlet-embedded</artifactId>
        <version>0.2-incubating</version>
    </dependency>
    <dependency>
        <groupId>org.jberet</groupId>
        <artifactId>jberet-support</artifactId>
        <version>1.1.0.Final</version>
    </dependency>
    <dependency>
        <groupId>net.sf.supercsv</groupId>
        <artifactId>super-csv</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.5.3</version>
    </dependency>
</dependencies>

/src/main/resources/META-INF/batch-jobs/load-csv.xml

The job uses csvItemReader and jdbcItemWriter that are supplied within the jberet-support package.

<job id="load-csv" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
    <step id="load">
        <chunk>
            <reader ref="csvItemReader">
                <properties>
                    <property name="resource" value="#{jobParameters['resource']}"/>
                    <property name="headerless" value="true"/>
                    <property name="delimiterChar" value=";"/>
                    <property name="beanType" value="java.util.List"/>
                </properties>
            </reader>
            <processor ref="forexItemProcessor">
                <properties>
                    <property name="symbol" value="#{jobParameters['symbol']}"/>
                </properties>
            </processor>
            <writer ref="jdbcItemWriter">
                <properties>
                    <property name="dataSourceLookup" value="java:jboss/datasources/MyDS"/>
                    <property name="sql"
                              value="INSERT INTO forex (symbol, ts, bid_open, bid_high, bid_low, bid_close, volume) values (?, ?, ?, ?, ?, ?, ?)"/>
                    <property name="beanType" value="java.util.List"/>
                </properties>
            </writer>
        </chunk>
    </step>
</job>

/src/main/java/jbatch/ForexItemProcessor.java

@Named
@Dependent
public class ForexItemProcessor implements ItemProcessor {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("uuuuMMdd HHmmss");

    @Inject
    @BatchProperty
    private String symbol;

    @Override
    public Object processItem(final Object item) throws Exception {
        final List items = (List) item;
        return Arrays.asList(symbol,
                Timestamp.valueOf(LocalDateTime.parse((String) items.get(0), FORMATTER)),
                new BigDecimal((String) items.get(1)),
                new BigDecimal((String) items.get(2)),
                new BigDecimal((String) items.get(3)),
                new BigDecimal((String) items.get(4)),
                Integer.valueOf((String) items.get(5)));
    }
}

Run the batch

Example if you put the forex CSV in /tmp/DAT_ASCII_USDJPY_M1_201504.csv:

curl 'http://localhost:8080/jbatch-example-1.0-SNAPSHOT/jbatch/rest/start/load-csv?symbol=USDJPY&resource=/tmp/DAT_ASCII_USDJPY_M1_201504.csv'

After the job done, check dataset within your database using the H2 CLI:

sql> select * from forex;
SYMBOL | TS                    | BID_OPEN | BID_HIGH | BID_LOW | BID_CLOSE | VOLUME
USDJPY | 2015-04-01 00:00:00.0 | 119.566  | 119.566  | 119.551 | 119.565   | 0
USDJPY | 2015-04-01 00:01:00.0 | 119.566  | 119.581  | 119.565 | 119.579   | 0
USDJPY | 2015-04-01 00:02:00.0 | 119.581  | 119.586  | 119.581 | 119.583   | 0
...
(31572 rows, 505 ms)

The project which used in this entry can be obtained from my GitHub repository.



No one has commented yet.

Leave a Comment

HTML Syntax: NOT allowed