public final class ImportWorker extends AbstractPatternWorker
*.d
data dump files into a relational database. Defines a number of inner,
helper classes, most of which are configured before a table is imported.
This worker enables a multi-threaded import, where different tables are
loaded in parallel threads. The number of threads which may actively be
importing data at once is set with the setMaximumThreads
method, which is
exposed to TRPL as a user function.
The idiom used to run an import bundle is to walk through the P2O tree of the
target database using the schema/import.xml
ruleset. This
ruleset prepares the necessary ImportWorker.SqlRecordLoader
instances and lists of indexes which must be created for each table. It
then invokes the prepareImport
user function for each table. This creates the necessary data structures
in preparation for the import.
When all tables have been prepared, the import ruleset invokes the runImport
user function. This analyzes
the tables targeted for import, their associated data export files, and
their dependencies and dependent tables to determine the critical paths
which must be traversed during the import. The tables are then sorted
into priority order to ensure that all critical paths are traversed as
directly as possible.
Once tables have been prioritized and sorted, the import begins. The prioritized queue of import bundles is traversed in order, and a thread is launched for each import bundle in the queue, up to the maximum number of active threads. When a thread completes importing a table, the next available bundle is removed from the queue and processed. A bundle is only allowed to commence if all tables upon which it is dependent have been imported first. If this is not the case, this bundle is skipped and the next waiting bundle is tested. This continues until a bundle is found which can commence immediately, or until the end of the bundle queue is reached. If no available bundle was found, but the bundle queue is not empty, the main thread waits until one of the active threads completes its bundle, then runs through the queue again, looking for an available candidate. This process continues until the bundle queue is empty.
Once all active import threads have completed and no more bundles are waiting
in the bundle queue, the import run is complete and the runImport
method returns control to TRPL.
Progress statistics are maintained and reported at the completion of each bundle in the queue. These include percent complete (based upon export file bytes processed), records loaded, and average number of records processed across various periods. Because these are only calculated and reported upon completion of a bundle, these statistics trend lower than the actual throughput being achieved. This is because at the point where one thread completes its work for a single table, all other threads are at some state of partial completion of their own bundles, so the actual throughput is not accurately reflected in the current calculations. However, the trade-off for more accurate, real-time statistics would have been a higher level of synchronization and a more complicated algorithm to account for dropped records and actual records imported. Rather than possibly compromise throughput with such an implementation, it was deemed better to settle for less accurate statistics. The final set of statistics reported at the end of the full run is reflective of the actual, average performance achieved.
Moving from a single-threaded (as this worker originally was designed) to a multi-threaded architecture paid huge dividends, particularly on multi-processor hardware. On a 4-processor, dual-core system, running with 8 active threads, an import of approximately 79 million primary records completed in approximately 40% of the time needed for a single-threaded run on the same system. Gains on a uni-processor system are not expected to be as dramatic, though some slight improvement is likely. Performance tends to scale directly with the number of (virtual) processors on a system. It is recommended to run with one thread per virtual processor for best results.
Modifier and Type | Class and Description |
---|---|
private static class |
ImportWorker.DataFileReader
Utility class that helps reading data from Progress exported .d files.
|
private class |
ImportWorker.IDContext
Thread local context which tracks next available primary key ID.
|
private static class |
ImportWorker.ImportBundle
All information necessary to perform the import of a single table, and to prioritize that
import among all tables to be imported in the same run.
|
class |
ImportWorker.Library
Library of callback user functions exposed to rulesets.
|
static class |
ImportWorker.SqlRecordLoader
Helper class which manages all aspects of the mapping between a
Progress data export file and the DMO which corresponds with a
particular database table.
|
private static class |
ImportWorker.WordTableInfo
Auxiliary data for the word table
|
Modifier and Type | Field and Description |
---|---|
private int |
activeThreads
Number of import threads currently active
|
private java.util.Set<java.lang.String> |
attempted
Set of all DMO classes for which an import was attempted
|
private int |
batchSize
JDBC batch size
|
private java.util.List<ImportWorker.ImportBundle> |
bundleQueue
Queue of import bundles
|
private java.util.Set<java.lang.String> |
complete
Set of unqualified DMO class names for tables which were processed
|
private java.lang.Object |
countLock
Lock for record count synchronization
|
(package private) static com.mchange.v2.c3p0.ComboPooledDataSource |
cpds
The data source used to connect to database when necessary.
|
private boolean |
createGenerator
Do we create the
p2j_id_generator_sequence or update it? Default is update. |
(package private) static Database |
database
The database we import into.
|
(package private) static java.lang.String |
dataPath
Directory path containing export files for the database being imported
|
private static int |
DEFAULT_BATCH_SIZE
Default JDBC batch size if not in database configuration.
|
private static java.lang.String |
defaultCpStream
The default
cpstream attribute for data (.d ) files which lack it from the PSC footer or
the PSC footer completely. |
private static java.lang.String |
defaultDateFormat
The default
dateformat attribute for data (.d ) files which lack it from the PSC footer
or the PSC footer completely. |
private static java.lang.String |
defaultNumFormat
The default
numformat attribute for data (.d ) files which lack it from the PSC footer or
the PSC footer completely. |
private Dialect |
dialect
Database dialect in use
|
private java.util.Map<java.lang.String,java.lang.Integer> |
errors
Stores the number of errors for each table individually.
|
private java.util.Set<java.lang.String> |
fatal
Set of unqualified DMO class names for tables which had fatal errors
|
private long |
idBracketSize
ID bracket size
|
private java.util.Deque<ImportWorker.IDContext> |
idContextStack
Deque of IDContexts, up to
maximumThreads deep. |
private java.util.Map<java.lang.String,ImportWorker.ImportBundle> |
importBundles
Map of unqualified DMO names to import bundles
|
private long |
imported
Number of records imported
|
private double |
importedBytes
Number of bytes imported
|
private static byte[] |
mapBytes
The mapped bytes used in an attempt to fix broken encoding in dump files.
|
private int |
maximumThreads
Number of import threads allowed to be active simultaneously
|
private long |
nextGlobalID
Next ID bracket
|
(package private) static Settings |
ormSettings
The ORM settings, including C3P0 and database connection.
|
private java.util.Map<java.lang.String,java.lang.String> |
sequences
Map of historical to SQL sequence names
|
private long |
startTime
System time at beginning of import
|
private static java.lang.String |
STATS_FMT
Format specifier for an import statistics update message.
|
private double |
totalBytes
Total byte count of all exported data files
|
Constructor and Description |
---|
ImportWorker()
Default constructor which initializes callback library.
|
Modifier and Type | Method and Description |
---|---|
private void |
createIdentitySequence()
Determines the high-water primary key generated during the import and creates an identity
generator sequence whose starting value is that ID.
|
private Session |
openSession()
Open a new database session and set it to flush only on explicit flushes.
|
private java.io.OutputStream |
openStream(java.lang.String filename)
Open a new buffered output stream for the given target filename.
|
private void |
write(java.io.OutputStream os,
java.lang.String text)
Output the given text to the stream in UTF-8 encoding.
|
private void |
writeStmt(java.io.OutputStream os,
java.lang.String text,
java.lang.String delimiter)
Output the given text as a statement (delimited and with a new line) to the stream in UTF-8 encoding.
|
finish, getCopy, getLibrary, getResolver, getSource, initialize, isConditionalRuleSets, isFlagActivated, isRuntimeQueryMode, registerTree, resolveConstant, setLibrary, visitAst
static java.lang.String dataPath
static Database database
static Settings ormSettings
static com.mchange.v2.c3p0.ComboPooledDataSource cpds
private static java.lang.String defaultNumFormat
numformat
attribute for data (.d
) files which lack it from the PSC footer or
the PSC footer completely. Ex: "46,44".private static java.lang.String defaultDateFormat
dateformat
attribute for data (.d
) files which lack it from the PSC footer
or the PSC footer completely. Ex: "dmy-1950".private static java.lang.String defaultCpStream
cpstream
attribute for data (.d
) files which lack it from the PSC footer or
the PSC footer completely. Ex: "1252".private static byte[] mapBytes
Charset
will detect an issue with it.private static final int DEFAULT_BATCH_SIZE
private static final java.lang.String STATS_FMT
private final java.util.Map<java.lang.String,ImportWorker.ImportBundle> importBundles
private final java.util.Map<java.lang.String,java.lang.String> sequences
private final java.util.Set<java.lang.String> complete
private final java.util.Set<java.lang.String> fatal
private final java.util.Set<java.lang.String> attempted
private final java.lang.Object countLock
private final java.util.Deque<ImportWorker.IDContext> idContextStack
maximumThreads
deep. This field is final to support the
synchronizing statement from ImportWorker.IDContext.prepareNextBracket()
method.private java.util.List<ImportWorker.ImportBundle> bundleQueue
private int batchSize
private Dialect dialect
private int maximumThreads
private int activeThreads
private long imported
private final java.util.Map<java.lang.String,java.lang.Integer> errors
private double importedBytes
private double totalBytes
private long startTime
private long nextGlobalID
private final long idBracketSize
private boolean createGenerator
p2j_id_generator_sequence
or update it? Default is update.public ImportWorker()
private Session openSession() throws PersistenceException
PersistenceException
private void createIdentitySequence()
private java.io.OutputStream openStream(java.lang.String filename)
filename
- Target output file. Must not reference a directory or any
resource that cannot be created or written to.java.lang.IllegalArgumentException
- If the file cannot be properly accessed or opened.private void write(java.io.OutputStream os, java.lang.String text) throws java.io.IOException
os
- Output destination.text
- Content to write.java.io.IOException
private void writeStmt(java.io.OutputStream os, java.lang.String text, java.lang.String delimiter) throws java.io.IOException
os
- Output destination.text
- Content to write.delimiter
- Statement ending delimiter.java.io.IOException