public final class ImportWorker extends AbstractPatternWorker
*.d
data dump files into a relational database using
Hibernate. Defines a number of inner, helper classes, most of which are
configured before a table is imported.
This worker enables a multi-threaded import, where different tables are
loaded in parallel threads. The number of threads which may actively be
importing data at once is set with the setMaximumThreads
method, which is
exposed to TRPL as a user function.
The idiom used to run an import bundle is to walk through the P2O tree of the
target database using the schema/import.xml
ruleset. This
ruleset prepares the necessary ImportWorker.SqlRecordLoader
instances and lists of indexes which must be created for each table. It
then invokes the prepareImport
user function for each table. This creates the necessary data structures
in preparation for the import.
When all tables have been prepared, the import ruleset invokes the runImport
user function. This analyzes
the tables targeted for import, their associated data export files, and
their dependencies and dependent tables to determine the critical paths
which must be traversed during the import. The tables are then sorted
into priority order to ensure that all critical paths are traversed as
directly as possible.
Once tables have been prioritized and sorted, the import begins. The prioritized queue of import bundles is traversed in order, and a thread is launched for each import bundle in the queue, up to the maximum number of active threads. When a thread completes importing a table, the next available bundle is removed from the queue and processed. A bundle is only allowed to commence if all tables upon which it is dependent have been imported first. If this is not the case, this bundle is skipped and the next waiting bundle is tested. This continues until a bundle is found which can commence immediately, or until the end of the bundle queue is reached. If no available bundle was found, but the bundle queue is not empty, the main thread waits until one of the active threads completes its bundle, then runs through the queue again, looking for an available candidate. This process continues until the bundle queue is empty.
Once all active import threads have completed and no more bundles are waiting
in the bundle queue, the import run is complete and the runImport
method returns control to TRPL.
Progress statistics are maintained and reported at the completion of each bundle in the queue. These include percent complete (based upon export file bytes processed), records loaded, and average number of records processed across various periods. Because these are only calculated and reported upon completion of a bundle, these statistics trend lower than the actual throughput being achieved. This is because at the point where one thread completes its work for a single table, all other threads are at some state of partial completion of their own bundles, so the actual throughput is not accurately reflected in the current calculations. However, the trade-off for more accurate, real-time statistics would have been a higher level of synchronization and a more complicated algorithm to account for dropped records and actual records imported. Rather than possibly compromise throughput with such an implementation, it was deemed better to settle for less accurate statistics. The final set of statistics reported at the end of the full run is reflective of the actual, average performance achieved.
Moving from a single-threaded (as this worker originally was designed) to a multi-threaded architecture paid huge dividends, particularly on multi-processor hardware. On a 4-processor, dual-core system, running with 8 active threads, an import of approximately 79 million primary records completed in approximately 40% of the time needed for a single-threaded run on the same system. Gains on a uni-processor system are not expected to be as dramatic, though some slight improvement is likely. Performance tends to scale directly with the number of (virtual) processors on a system. It is recommended to run with one thread per virtual processor for best results.
Modifier and Type | Class and Description |
---|---|
private class |
ImportWorker.DataFileReader
Utility class that helps reading data from Progress exported .d files.
|
private class |
ImportWorker.IDContext
Thread local context which tracks next available primary key ID.
|
private class |
ImportWorker.ImportBundle
All information necessary to perform the import of a single table, and
to prioritize that import among all tables to be imported in the same
run.
|
class |
ImportWorker.Library
Library of callback user functions exposed to rulesets.
|
static class |
ImportWorker.QueryHelper
A helper class to manage the information necessary to compose and
execute a database query using HQL (Hibernate Query Language).
|
static class |
ImportWorker.SqlRecordLoader
Helper class which manages all aspects of the mapping between a
Progress data export file and the DMO which corresponds with a
particular database table.
|
Modifier and Type | Field and Description |
---|---|
private int |
activeThreads
Number of import threads currently active
|
private java.util.Set<java.lang.String> |
attempted
Set of all DMO classes for which an import was attempted
|
private int |
batchSize
JDBC batch size
|
private java.util.List<ImportWorker.ImportBundle> |
bundleQueue
Queue of import bundles
|
private static java.lang.String |
COLUMN_NAME
Database metadata column name for column name metadata column
|
private java.util.Set<java.lang.String> |
complete
Set of unqualified DMO class names for tables which were processed
|
private java.lang.Object |
countLock
Lock for record count synchronization
|
private static java.lang.String |
DATA_TYPE
Database metadata column name for data type metadata column
|
private static int |
DEFAULT_BATCH_SIZE
Default JDBC batch size if not in Hibernate configuration
|
private P2JDialect |
dialect
Database dialect in use
|
private java.util.Set<java.lang.String> |
fatal
Set of unqualified DMO class names for tables which had fatal errors
|
private long |
idBracketSize
ID bracket size
|
private java.util.Deque<ImportWorker.IDContext> |
idContextStack
Deque of IDContexts, up to
maximumThreads deep. |
private java.util.Map<java.lang.String,ImportWorker.ImportBundle> |
importBundles
Map of unqualified DMO names to import bundles
|
private long |
imported
Number of records imported
|
private double |
importedBytes
Number of bytes imported
|
private static java.util.logging.Logger |
log
Logger
|
private int |
maximumThreads
Number of import threads allowed to be active simultaneously
|
private long |
nextGlobalID
Next ID bracket
|
private java.util.Map<java.lang.String,java.lang.String> |
sequences
Map of historical to SQL sequence names
|
private org.hibernate.SessionFactory |
sessionFactory
Hibernate session factory
|
private long |
startTime
System time at beginning of import
|
private static java.lang.String |
STATS_FMT
Format specifier for an import statistics update message.
|
private static java.lang.String |
TABLE
Database metadata table type for regular tables
|
private static java.lang.String |
TABLE_NAME
Database metadata column name for table name metadata column
|
private double |
totalBytes
Total byte count of all exported data files
|
resolver
Constructor and Description |
---|
ImportWorker()
Default constructor which initializes callback library.
|
Modifier and Type | Method and Description |
---|---|
private java.lang.String |
buildDropIdentitySequence(org.hibernate.dialect.Dialect dialect)
Build the dialect-specific DDL to drop the primary ID sequence.
|
private void |
createIdentitySequence()
Determines the high-water primary key generated during the import and creates an identity
generator sequence whose starting value is that ID.
|
private org.hibernate.Session |
openSession()
Open a new database session and set it to flush only on explicit
flushes.
|
private java.io.OutputStream |
openStream(java.lang.String filename)
Open a new buffered output stream for the given target filename.
|
private void |
write(java.io.OutputStream os,
java.lang.String text)
Output the given text to the stream in UTF-8 encoding.
|
private void |
writeStmt(java.io.OutputStream os,
java.lang.String text,
java.lang.String delimiter)
Output the given text as a statement (delimited and with a new line) to
the stream in UTF-8 encoding.
|
finish, getCopy, getLibrary, getSource, initialize, registerTree, resolveConstant, setLibrary, visitAst
private static final java.util.logging.Logger log
private static final int DEFAULT_BATCH_SIZE
private static final java.lang.String TABLE
private static final java.lang.String TABLE_NAME
private static final java.lang.String COLUMN_NAME
private static final java.lang.String DATA_TYPE
private static final java.lang.String STATS_FMT
private final java.util.Map<java.lang.String,ImportWorker.ImportBundle> importBundles
private final java.util.Map<java.lang.String,java.lang.String> sequences
private final java.util.Set<java.lang.String> complete
private final java.util.Set<java.lang.String> fatal
private final java.util.Set<java.lang.String> attempted
private final java.lang.Object countLock
private final java.util.Deque<ImportWorker.IDContext> idContextStack
maximumThreads
deep.
This field is final to support the synchronizing statement from
ImportWorker.IDContext.prepareNextBracket()
method.private java.util.List<ImportWorker.ImportBundle> bundleQueue
private org.hibernate.SessionFactory sessionFactory
private int batchSize
private P2JDialect dialect
private int maximumThreads
private int activeThreads
private long imported
private double importedBytes
private double totalBytes
private long startTime
private long nextGlobalID
private long idBracketSize
public ImportWorker()
private org.hibernate.Session openSession()
private java.lang.String buildDropIdentitySequence(org.hibernate.dialect.Dialect dialect)
dialect
- The database-specific SQL helper.private void createIdentitySequence()
private java.io.OutputStream openStream(java.lang.String filename)
filename
- Target output file. Must not reference a directory or any
resource that cannot be created or written to.java.lang.IllegalArgumentException
- If the file cannot be properly accessed or opened.private void write(java.io.OutputStream os, java.lang.String text) throws java.io.IOException
os
- Output destination.text
- Content to write.java.io.IOException
private void writeStmt(java.io.OutputStream os, java.lang.String text, java.lang.String delimiter) throws java.io.IOException
os
- Output destination.text
- Content to write.delimiter
- Statement ending delimiter.java.io.IOException