public class ImportWorker.Library
extends java.lang.Object
Constructor and Description |
---|
Library() |
Modifier and Type | Method and Description |
---|---|
void |
addSequence(java.lang.String historical,
java.lang.String sqlName)
Adds a sequence to the map of sequences to be initialized from the sequences export file.
|
private int |
adjustedCounter(int counter,
int badIndex,
int dropCount)
Adjust the given counter to take the specified
badIndex
and dropCount values into account. |
private void |
createIndex(Session session,
P2JIndex index,
boolean unique,
java.lang.String fileName)
Create an index based upon the specified definition.
|
private void |
createIndexes(java.util.Collection<P2JIndex> indexes,
boolean unique,
java.lang.String fileName)
Create all indexes in the specified list which match the uniqueness specified by
unique . |
private void |
dropIndex(Session session,
P2JIndex index,
java.lang.String fileName)
Drop an index based upon the specified definition.
|
private void |
dropIndexes(java.util.Collection<P2JIndex> indexes,
java.lang.String fileName)
Sequentially drop all of the specified indexes from the database.
|
java.lang.Object |
executeUniqueResultSqlQuery(Session session,
java.lang.String queryText)
Executes a query statement that returns an unique result and return the respective value.
|
private void |
generateCreateIndexes(java.io.OutputStream os,
java.lang.String delimiter,
java.util.Collection<P2JIndex> indexes,
boolean unique)
Sequentially create the DDL to create all of the specified indexes from
the database, write this SQL to the given stream.
|
private void |
generateDropIndexes(java.io.OutputStream os,
java.lang.String delimiter,
java.util.Collection<P2JIndex> indexes)
Sequentially create the DDL to drop all of the specified indexes from
the database, write this SQL to the given stream.
|
java.lang.Class<? extends Record> |
getDmoClass(java.lang.String ifaceName)
Get the implementation class mapped to the specified interface.
|
java.lang.String |
getWordTableName(java.lang.String ifaceName,
java.lang.String indexName)
Get word table name for the DMO and index name
|
private void |
importAsync(ImportWorker.ImportBundle bundle,
java.lang.String dateOrder,
java.lang.Long yearOffset)
Import a single table on a dedicated import thread.
|
int |
importTable(ImportWorker.SqlRecordLoader loader,
java.util.List<P2JIndex> indexes)
Import all data for a table into the database, using the given record loader and current session
factory.
|
void |
initialize(java.util.Properties props,
java.lang.String dataPath)
Initializes the object using the information specified in the given
Properties and build the
resulting session factory. |
long |
initializeSequences(java.lang.String path,
java.lang.String seqDumpFile)
Sets the values of the sequences.
|
private void |
messageWithStats(java.lang.String message)
Print a message to stdout, followed by the current import statistics.
|
private ImportWorker.ImportBundle |
nextBundle()
Return the next import bundle which is eligible for import.
|
Session |
openDatabaseSession()
Open a new database session.
|
private void |
populateWordTable(java.sql.Connection conn,
java.lang.String wordTableName,
java.lang.Long primaryKey,
int index,
java.lang.String[] words,
boolean toUppercase,
java.lang.String fileName)
Populate word tables with data.
|
private void |
populateWordTable(java.sql.Connection conn,
java.lang.String wordTableName,
java.lang.Long primaryKey,
java.lang.String[] words,
boolean toUppercase,
java.lang.String fileName)
Populate word tables with data.
|
private void |
populateWordTables(Session session,
BaseRecord dmo,
java.util.List<ImportWorker.WordTableInfo> wordTables,
java.lang.String fileName)
Populate word tables with data from the record.
|
void |
prepareImport(ImportWorker.SqlRecordLoader loader,
java.util.List<P2JIndex> indexes)
Store the given objects in preparation for an import run.
|
void |
reindex(java.lang.String table,
java.util.List<P2JIndex> indexes,
java.lang.String fileName)
Drop and recreate the list of indexes for the given table.
|
private int |
removeFailingRecord(java.util.List<java.lang.Object> records,
PersistenceException exc,
int counter,
java.lang.String fileName)
Deprecated.
Somewhere along the way, the mechanism we were using to determine which
statement in the batch failed stopped working
(
BatchUpdateException.getUpdateCounts now reports
Statement.EXECUTE_FAILED for every statement in the batch). So, this
method is no longer reliable. |
java.lang.String |
reportErrors()
Reports any error encountered.
|
long |
runImport()
Commence an import run, potentially on multiple threads.
|
void |
setMaximumThreads(int maximumThreads)
Set the maximum allowable threads.
|
public void initialize(java.util.Properties props, java.lang.String dataPath)
Properties
and build the
resulting session factory. One particular parameter, JDBC batch size, is explicitly configured to a
default value if it is not specified in the configuration file.
In addition, the names of all text columns are mapped by table name for use during index creation.
props
- The import properties.dataPath
- Path of directory containing data export files, and parent directory of LOB
export directory, if any.public Session openDatabaseSession() throws PersistenceException
PersistenceException
- If any issue is encountered during the operation.public java.lang.Object executeUniqueResultSqlQuery(Session session, java.lang.String queryText) throws PersistenceException
session
- The session that holds the connection to the database to be used.queryText
- The SQL query statement.PersistenceException
- If any issue is encountered during the operation.public void setMaximumThreads(int maximumThreads)
maximumThreads
- Maximum number of import threads which may be active at one
time.public void prepareImport(ImportWorker.SqlRecordLoader loader, java.util.List<P2JIndex> indexes)
runImport()
method is invoked.loader
- Record loader helper which contains all information
necessary to find the import data and to map each record
in the import file to a DMO of the appropriate class.indexes
- A list of index definitions which must be applied to the
table before (for unique indexes) and after (for non-unique
indexes) data import.setMaximumThreads(int)
public long runImport() throws java.lang.InterruptedException
prepareImport(com.goldencode.p2j.schema.ImportWorker.SqlRecordLoader, java.util.List<com.goldencode.p2j.persist.P2JIndex>)
will be imported.
This method organizes the tables into an order which is intended to maximize throughput throughout the import run, by scheduling those tables with the most time-consuming import path to run earliest in the process.
Multiple, parallel threads are used to import multiple tables at once.
This method will block until all prepared tables have been imported.
java.lang.InterruptedException
- if the thread on which this method is invoked is interrupted.prepareImport(com.goldencode.p2j.schema.ImportWorker.SqlRecordLoader, java.util.List<com.goldencode.p2j.persist.P2JIndex>)
,
setMaximumThreads(int)
public java.lang.String reportErrors()
null
immediately.public void addSequence(java.lang.String historical, java.lang.String sqlName)
historical
- the original name of the sequence.sqlName
- the converted, SQL-compliant name of the sequence.public long initializeSequences(java.lang.String path, java.lang.String seqDumpFile)
path
- Path to the file file to which the sequences' values have been exportedseqDumpFile
- The file to which the sequences' values have been exportedpublic int importTable(ImportWorker.SqlRecordLoader loader, java.util.List<P2JIndex> indexes)
Stream.readField(com.goldencode.p2j.util.BaseDataType[])
to scan the input file. Each
record's data is stored in a new instance of a data model object (DMO).
If the DMO refers to any other DMOs, the necessary queries are executed to resolve these references before the object is persisted. This ensures foreign key associations are correctly resolved at import time.
The import uses JDBC batching to send data to the database in batches rather than a single record at a time. Batch size is retrieved from the configuration, and is set to a default if not specified in the config.
This method is designed to be fairly fault tolerant. Database errors which are the result of data which does not conform to the expected schema generally are handled by dropping the offending record, adding a log entry to report the nature of the error, and continuing with the next record, if any. Specifically, the following conditions are recoverable:
mapBytes
);
The following errors are not recoverable:
Errors are logged as configured for the current VM.
loader
- Record loader helper which contains all information necessary to find the import data and to
map each record in the import file to a DMO of the appropriate class.indexes
- A list of index definitions which must be applied to the table before (for unique indexes)
and after (for non-unique indexes) data import.private void populateWordTables(Session session, BaseRecord dmo, java.util.List<ImportWorker.WordTableInfo> wordTables, java.lang.String fileName) throws PersistenceException, java.sql.SQLException
session
- Database session.dmo
- Master record.wordTables
- List containing data to be used when populating the word tables.fileName
- The name of the file where the data is read. Used for debugging.PersistenceException
- if a FWD internal problem occurred.java.sql.SQLException
- if a SQL related problem occurred.private void populateWordTable(java.sql.Connection conn, java.lang.String wordTableName, java.lang.Long primaryKey, java.lang.String[] words, boolean toUppercase, java.lang.String fileName) throws java.sql.SQLException
conn
- Database connection.wordTableName
- The word table name.primaryKey
- The primary key of the master record.words
- The words in the field.toUppercase
- Instruct to convert words to uppercase.fileName
- The name of the file where the data is read. Used for debugging.java.sql.SQLException
- if a SQL related problem occurred.private void populateWordTable(java.sql.Connection conn, java.lang.String wordTableName, java.lang.Long primaryKey, int index, java.lang.String[] words, boolean toUppercase, java.lang.String fileName) throws java.sql.SQLException
conn
- Database connection.wordTableName
- The word table name.primaryKey
- The primary key of the master record.index
- The index in the extent.words
- The words in the field.toUppercase
- Instructs to convert words to uppercase.fileName
- The name of the file where the data is read. Used for debugging.java.sql.SQLException
- if a SQL related problem occurred.public void reindex(java.lang.String table, java.util.List<P2JIndex> indexes, java.lang.String fileName) throws PersistenceException
table
which are not within the list are ignored.
Indexes to be dropped are identified by the names provided in the list of index definitions; the remainder of the definition is not used to identify indexes to be dropped.
table
- Table which is to be reindexed.indexes
- List of index definitions.fileName
- The name of the file where the data is read. Used for debugging.PersistenceException
- if any error occurs opening or closing sessions, or obtaining database connections.public java.lang.Class<? extends Record> getDmoClass(java.lang.String ifaceName) throws java.lang.ClassNotFoundException
ifaceName
- The full qualified interface name for which an implementing class name is being
looked up.iface
, or null
if no such
mapping is found.java.lang.ClassNotFoundException
- if ifaceName
parameter is not recognized.java.lang.IllegalArgumentException
- if ifaceName
parameter is not recognized.public java.lang.String getWordTableName(java.lang.String ifaceName, java.lang.String indexName)
ifaceName
- DMP interface nameindexName
- index nameprivate ImportWorker.ImportBundle nextBundle() throws java.lang.InterruptedException
null
.
This method will block if the maximum number of active threads has been reached. It will unblock once a slot becomes available.
null
if no bundles remain in
the queue.java.lang.InterruptedException
- if the running thread is interrupted while waiting for an import thread slot to
become available.private void importAsync(ImportWorker.ImportBundle bundle, java.lang.String dateOrder, java.lang.Long yearOffset)
Once a thread slot opens up, a new thread is launched to import the table associated
with bundle
. If this table is dependent upon the import of other tables,
and not all of these tables have yet been imported, the new thread will block until such
time as all such dependencies have been resolved. If any tables upon which the target
table is dependent have encountered fatal errors, the import of the target table is
aborted and the target table is added to the set of tables with fatal errors.
bundle
- Import bundle which holds all information necessary to import data into a
single table.dateOrder
- The date format to be used when importing if PSC footer not found in file.yearOffset
- The offset year to be used when importing if PSC footer not found in file.setMaximumThreads(int)
private void messageWithStats(java.lang.String message)
message
- The core message to be printed. May be null
if only statistics are
desired.private void generateDropIndexes(java.io.OutputStream os, java.lang.String delimiter, java.util.Collection<P2JIndex> indexes) throws java.io.IOException
os
- Output destination.delimiter
- End of statement delimiter.indexes
- Collection of indexes to be dropped.java.io.IOException
private void dropIndexes(java.util.Collection<P2JIndex> indexes, java.lang.String fileName) throws PersistenceException
indexes
- Collection of indexes to be dropped.fileName
- The name of the file where the data is read. Used for debugging.PersistenceException
- if any error occurs opening or closing a session, or obtaining a database connection.private void dropIndex(Session session, P2JIndex index, java.lang.String fileName) throws PersistenceException
session
- The session to use.index
- Definition of the index to be dropped.fileName
- The name of the file where the data is read. Used for debugging.PersistenceException
- if an error occurs at the database during execution of the index drop statement.private void generateCreateIndexes(java.io.OutputStream os, java.lang.String delimiter, java.util.Collection<P2JIndex> indexes, boolean unique) throws java.io.IOException
os
- Output destination.delimiter
- End of statement delimiter.indexes
- Collection of indexes to be created.java.io.IOException
private void createIndexes(java.util.Collection<P2JIndex> indexes, boolean unique, java.lang.String fileName) throws PersistenceException
unique
. If none match, no indexes are created.indexes
- A list of index definitions.unique
- If true
, all unique indexes in the list are created; if false
, all
non-unique indexes in the list are created.fileName
- The name of the file where the data is read. Used for debugging.PersistenceException
private void createIndex(Session session, P2JIndex index, boolean unique, java.lang.String fileName) throws PersistenceException
session
- The session to use.index
- Definition of the index to be created.unique
- If true
, create the index as a unique index, false
to omit the unique
constraint.fileName
- The name of the file where the data is read. Used for debugging.PersistenceException
- if an error occurs at the database during index creation.private int adjustedCounter(int counter, int badIndex, int dropCount)
badIndex
and dropCount
values into account.counter
- Number of total records read, but not necessarily imported.badIndex
- Index of the first problem record in an import batch, or
-1 if there was no problem record. The index is zero-based,
relative to the beginning of the batch.dropCount
- Cumulative number of problem records which have been
dropped from the import of the current table.@Deprecated private int removeFailingRecord(java.util.List<java.lang.Object> records, PersistenceException exc, int counter, java.lang.String fileName) throws PersistenceException
BatchUpdateException.getUpdateCounts
now reports
Statement.EXECUTE_FAILED
for every statement in the batch). So, this
method is no longer reliable.exc
until an exception of type
BatchUpdateException
is found. If such a cause is found, the number of
successful updates is extracted. This number represents the index of the first record
which was not successful. It is this record which is removed from the batch.
An error message is logged recording the 1-based index of the problem record, the data
file from which it was read, and the nature of the error as extracted from exc
.
The purpose of this method is to preserve information regarding the failure, while dropping the problem record in order to clean up the current batch of records to enable a retry of the batch insert.
records
- List of records which comprised the failed batch.exc
- Exception which was caught on batch insert failure.counter
- Number of total records read up to this point. This number will be at most
batchSize
higher than the number of records successfully imported to
this point.fileName
- The name of the file where the data is read. Used for debugging.records
, of the
problem record.PersistenceException
- in the event exc
doesn't contain a chained BatchUpdateException
.
In this case, the index of a problem record cannot be determined, and it is
likely that the error is more severe than a single bad record.