Product Import with Spring Batch - Part1

I have two websites, which on a regular basis import products belonging to affiliate programs. The websites were developed in 2005 and 2006. In the process of moving the applications from a Windows to a Linux server, I decided to rewrite and modularize a lot of code that was duplicated or poorly performing. For the product import, I selected Spring Batch as new framework. Spring Batch forces you to write little code chunks instead huge jobs, which will then make up your whole batch. Also, there is a ready-to-use tool for monitoring and I am familiar with the standard Spring Framework.

Since the full batch might be complex later, I decided to publish my experiences here while I implement. In the first part, I will use Spring Batch to download a product data file (csv) that belongs to an affiliate program. To build the application, I use Maven 2. In the version for this part, only 5 dependencies are needed. Spring Batch of course, commons-io and commons-lang to help me with some utility stuff, junit and log4j.


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

4.0.0
com.kanzelbahn.utils
product-import
jar
1.0-SNAPSHOT
Product Import Library


2.0.3.RELEASE
2.5.6
false






commons-io
commons-io
1.4


commons-lang
commons-lang
2.4




org.springframework.batch
spring-batch-core
${spring.batch.version}


org.springframework.batch
spring-batch-test
${spring.batch.version}




junit
junit
4.4
test




log4j
log4j
1.2.9







org.apache.maven.plugins
maven-surefire-plugin

${skipJunitTests}
-Xms128m -Xmx256m -XX:PermSize=128m -XX:MaxPermSize=256m
false




org.apache.maven.plugins
maven-compiler-plugin

1.5
1.5
false
false






pom.xml

One big disadvantage of Spring Batch 2.0, is that you cannot unit test it together with TestNG. The problem is rather in the Spring Framework than Spring Batch. When you write a Spring powered TestNG unit test, you need to extend AbstractTestNGSpringContextTests. There is no Runner to use in the @RunWith annotation, like SpringJunit4ClassRunner. This alone is not a problem, but since your Spring Batch test also need to extend from AbstractJobTests, and you cannot inherit twice, TestNG is out of the loop. This will be fixed in Spring Batch 2.1 because then you will not have to extend from AbstractJobTests anymore. I did not use version 2.1 because it was not release at the point of writing this post.

Let's have a look at the job configuration.


xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.0.xsd">




















jobs.xinc

As you can see, it has only three (well three and a half) steps. Step 1 is implemented in the InitializingTasklet and all it really does is logging the start time. The next step is called csv_exists and is a decision step. If the csv-File for the current day exists, I move on to Step 3, otherwise Step 2 is invoked. Step 2 is implemented in the DownloaderTasklet. This Tasklet will download the csv-File of the current day. Step 3 is implemented in the FinishingTasklet and also performs basic logging. Let's look at the three different Tasklet's and the JobExecutionDecider.

The InitializingTasklet for Step 1 is pretty much self explaining. On a side note, see how I use FastDateFormat instead of SimpleDateFormatter because it is not thread-safe.


/**
* Performs initialization tasks.
*
* @author reik.schatz Dec 11, 2009
*/
public class InitializingTasklet implements Tasklet {
private static final Logger LOGGER = Logger.getLogger(InitializingTasklet.class);

private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");

public RepeatStatus execute(final StepContribution stepContribution, final ChunkContext chunkContext) throws Exception {
LOGGER.debug("Initializing at " + DATE_FORMAT.format(new Date()) + ".");
return RepeatStatus.FINISHED;
}
}
InitializingTasklet.java

The JobExecutionDecider for the csv_exists decision is implemented in a class called DoesCsvExistDecision. When constructing the bean, you need to specify a CsvFileFacade to handle the access to the csv-File. CsvFileFacade is an Interface and the my only implementation is the class CsvFileFacadeImpl. The implementation expects a ImportSettings instance upon creation. Everything is wired together by Spring. Using the ImportSettings instance, the CsvFileFacade knows about the root directory, to which the csv-Files shall be downloaded to, the affiliate program id and the location of the csv-File on the Internet. I used the German affiliate program Bakker as a sample implementation.


/**
* A {@link CsvFileFacade} wraps the handling of csv files.
*
* @author reik.schatz Dec 11, 2009
*/
public interface CsvFileFacade {

File getCsvFile();

URL getDataFileURL();
}
CsvFileFacade.java


/**
* A {@link CsvFileFacade} which retrieves informations about the csv file
* location from a {@link ImportSettings} instance.
*
* @author reik.schatz Dec 11, 2009
*/
public class CsvFileFacadeImpl implements CsvFileFacade {
private static final Logger LOGGER = Logger.getLogger(CsvFileFacadeImpl.class);

private final ImportSettings _settings;

public CsvFileFacadeImpl(final ImportSettings settings) {
_settings = settings;
}

public File getCsvFile() {
final String fileName = _settings.getImportable().getProgramId() + ".csv";
final File dailyDirectory = _settings.getDirectory();
return new File(dailyDirectory, fileName);
}

public URL getDataFileURL() {
return _settings.getImportable().getDataFile();
}
}
CsvFileFacadeImpl.java


/**
* Wraps all settings for the current import run.
*
* @author reik.schatz Dec 13, 2009
*/
public interface ImportSettings {

/**
* Get's the directory to which the datafile shall be imported to.
*
* @return File
*/
File getDirectory();

/**
* Returns the {@link Importable} which shall be used.
*
* @return Importable
*/
Importable getImportable();
}
ImportSettings.java


/**
* Encapsulates settings of for a single import run.
*
* @author reik.schatz Dec 13, 2009
*/
public class StandardSettings implements ImportSettings {

private final File _rootDirectory;
private final Importable _importable;

public StandardSettings(final File importDirectory, final Importable importable) {
_importable = importable;

if (importDirectory == null || !importDirectory.exists()) {
final String path = importDirectory == null ? "" : importDirectory.getPath();
throw new IllegalArgumentException("Given importDirectory (" + path + ") does not exist.");
}

_rootDirectory = importDirectory;
}

/** @inheritDoc **/
public File getDirectory() {
final Date now = new Date();
final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd");
final String day = df.format(now);

final File importableDataFileDirectory = new File(_rootDirectory, day);
if (!importableDataFileDirectory.exists()) {
try {
FileUtils.forceMkdir(importableDataFileDirectory);
} catch (IOException e) {
throw new IllegalStateException("Unable to create daily import directory (" + day + ")", e);
}
}
return importableDataFileDirectory;
}

/** @inheritDoc **/
public Importable getImportable() {
return _importable;
}
}
StandardSettings.java


/**
* Represents a importable program.
*
* @author reik.schatz Dec 13, 2009
*/
public interface Importable {

public int getProgramId();

public URL getDataFile();
}
Importable.java


/**
* A {@link Importable} which wraps all parameters for the german affiliate program Bakker.
*
* @author reik.schatz Dec 13, 2009
*/
public class Bakker implements Importable, Serializable {

private static final long serialVersionUID = 7526472295622776147L;

private final int _programId;
private final URL _dataFile;

public Bakker(final int programId, final String dataFileLocation) {
_programId = programId;
try {
_dataFile = new URL(dataFileLocation);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Given dataFileLocation " + dataFileLocation + " is not a valid URL");
}
}

public int getProgramId() {
return _programId;
}

public URL getDataFile() {
return _dataFile;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

final Bakker bakker = (Bakker) o;

if (_programId != bakker._programId) return false;

return true;
}

@Override
public int hashCode() {
return _programId;
}

@Override
public String toString() {
return "Bakker{" +
", _programId=" + _programId +
", _dataFile=" + _dataFile +
'}';
}
}
Bakker.java

Every JobExecutionDecider must implement the decide method. I get the csv-File from the CsvFileFacade, which will be a different file depending on the day you run the job and the affiliate program. If the csv-File exists, I return FlowExecutionStatus.COMPLETED which will invoke Step 3. Otherwise I return FlowExecutionStatus.FAILED which will invoke Step 2 – to download the file.


/**
* Tests for the existence of the csv file in the specified
* {@link CsvFileFacade}.
*
* @author reik.schatz Dec 11, 2009
*/
public class DoesCsvExistDecision implements JobExecutionDecider {

private final CsvFileFacade _csvFileFacade;

public DoesCsvExistDecision(final CsvFileFacade csvFileFacade) {
_csvFileFacade = csvFileFacade;
}

public FlowExecutionStatus decide(final JobExecution jobExecution, final StepExecution stepExecution) {
final File csvFile = _csvFileFacade.getCsvFile();
if (csvFile.isFile()) {
return FlowExecutionStatus.COMPLETED;
} else {
return FlowExecutionStatus.FAILED;
}
}
}
Bakker.java

The file download is wrapped in the DownloaderTasklet. The Tasklet again is injected with a reference to the CsvFileFacade. Using the Facade and FileUtils from commons-io, I download the csv-File and store it physical on disc.


/**
* Downloads the csv file.
*
* @author reik.schatz Dec 11, 2009
*/
public class DownloaderTasklet implements Tasklet {

private final CsvFileFacade _csvFileFacade;

public DownloaderTasklet(final CsvFileFacade csvFileFacade) {
_csvFileFacade = csvFileFacade;
}

public RepeatStatus execute(final StepContribution stepContribution, final ChunkContext chunkContext) throws Exception {
final File csvFile = _csvFileFacade.getCsvFile();
final URL location = _csvFileFacade.getDataFileURL();
try {
FileUtils.copyURLToFile(location, csvFile);
} catch (IOException e) {
throw new IllegalStateException("Unable to download csv file.", e);
}

return RepeatStatus.FINISHED;
}
}
DownloaderTasklet.java

The job ends for now in Step 3, which simply invokes log4j one more time.


/**
* Contains actions to be done when a Job is finishing.
*
* @author reik.schatz Dec 11, 2009
*/
public class FinishingTasklet implements Tasklet {
private static final Logger LOGGER = Logger.getLogger(InitializingTasklet.class);

private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");

public RepeatStatus execute(final StepContribution stepContribution, final ChunkContext chunkContext) throws Exception {
LOGGER.debug("Finished at " + DATE_FORMAT.format(new Date()) + ".");

return RepeatStatus.FINISHED;
}
}
FinishingTasklet.java

Unit testing could not be easier.


/**
* Tests the import job.
*
* @author reik.schatz Dec 11, 2009
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/applicationContext.xml" })
public class ImportJobTest extends AbstractJobTests {

@Autowired
private CsvFileFacade _csvFileFacade;

@Transactional
@Test
public void testChain() throws Exception {
final JobExecution jobExecution = this.launchJob();
assertEquals(jobExecution.getExitStatus(), ExitStatus.COMPLETED);
assertTrue(_csvFileFacade.getCsvFile().exists());
}
}
ImportJobTest.java

You can download the full source code from Google Groups. The zip-archive will also contain the remaining parts of the Spring configuration, which is needed to wire all beans together.