public abstract class SparkJob extends distributed.core.DistributedJob implements OptionHandler
Modifier and Type | Class and Description |
---|---|
static class |
SparkJob.NoKeyTextOutputFormat<K,V>
Subclass of TextOutputFormat that does not write the key.
|
Modifier and Type | Field and Description |
---|---|
static java.lang.String |
TEST_DATA
The key for a test RDD
|
static java.lang.String |
TRAINING_DATA
The key for a training RDD
|
Constructor and Description |
---|
SparkJob(java.lang.String jobName,
java.lang.String jobDescription)
Constructor.
|
Modifier and Type | Method and Description |
---|---|
static java.lang.String |
addSubdirToPath(java.lang.String parent,
java.lang.String subdirName)
Adds a subdirectory to a parent path.
|
static boolean |
checkFileExists(java.lang.String file)
Check that the named file exists on either the local file system or HDFS.
|
org.apache.spark.api.java.JavaSparkContext |
createSparkContextForJob(SparkJobConfig conf)
Create a SparkContext for this job.
|
java.lang.String |
debugTipText()
Tip text for this property.
|
static void |
deleteDirectory(java.lang.String path)
Delete a directory (and all contents).
|
java.lang.String[] |
getBaseOptionsOnly()
Return the base options only (not the subclasses options or the options
specific to the configuration)
|
CachingStrategy |
getCachingStrategy()
Get the caching strategy to use for this job
|
Dataset<?> |
getDataset(java.lang.String key)
Return a named dataset, or null if the name is unknown.
|
java.util.Iterator<java.util.Map.Entry<java.lang.String,Dataset<?>>> |
getDatasets()
Return an iterator over the named datasets for this job
|
boolean |
getDebug()
Get whether to output debug info.
|
static org.apache.hadoop.conf.Configuration |
getFSConfigurationForPath(java.lang.String path,
java.lang.String[] pathOnly)
Returns a Configuration object configured with the name node and port
present in the supplied path (hdfs://host:port/path).
|
java.lang.String[] |
getOptions() |
static long |
getSizeInBytesOfPath(java.lang.String path)
Get the size in bytes of a file/directory
|
org.apache.spark.api.java.JavaSparkContext |
getSparkContext()
Get the current spark context in use by this (and potentially other) jobs.
|
SparkJobConfig |
getSparkJobConfig()
Get the SparkJobConfig object for this job
|
org.apache.log4j.WriterAppender |
initJob(org.apache.spark.api.java.JavaSparkContext context)
Initialize logging and set or create a context to use.
|
org.apache.log4j.WriterAppender |
initSparkLogAppender()
Initialize and return an appender for hooking into Spark's log4j logging
and directing it to Weka's log
|
java.util.Enumeration<Option> |
listOptions() |
org.apache.spark.api.java.JavaRDD<Instance> |
loadCSVFile(java.lang.String path,
Instances headerNoSummary,
java.lang.String csvParseOptions,
org.apache.spark.api.java.JavaSparkContext sparkContext,
CachingStrategy strategy,
int minPartitions,
boolean enforceMaxPartitions)
Load a file/directory containing instances in CSV format.
|
org.apache.spark.api.java.JavaRDD<Instance> |
loadInput(java.lang.String inputPath,
Instances headerNoSummary,
java.lang.String csvParseOptions,
org.apache.spark.api.java.JavaSparkContext sparkContext,
CachingStrategy strategy,
int minPartitions,
boolean enforceMaxPartitions)
Load an input file/directory.
|
org.apache.spark.api.java.JavaRDD<Instance> |
loadInstanceObjectFile(java.lang.String path,
org.apache.spark.api.java.JavaSparkContext sparkContext,
CachingStrategy strategy,
int minPartitions,
boolean enforceMaxPartitions)
Load a file/directory of serialized instances (as stored in Spark object
file format).
|
static java.io.InputStream |
openFileForRead(java.lang.String file)
Opens the named file for reading on either the local file system or HDFS.
|
static java.io.OutputStream |
openFileForWrite(java.lang.String file)
Open the named file for writing to on either the local file system or HDFS.
|
static java.io.PrintWriter |
openTextFileForWrite(java.lang.String file)
Open the named file as a text file for writing to on either the local file
system or any other protocol specific file system supported by Hadoop.
|
void |
removeSparkLogAppender(org.apache.log4j.WriterAppender appender)
Remove the supplied appender from Spark's logging.
|
static java.lang.String |
resolveLocalOrOtherFileSystemPath(java.lang.String original)
Takes an input path and returns a fully qualified absolute one.
|
boolean |
runJob() |
abstract boolean |
runJobWithContext(org.apache.spark.api.java.JavaSparkContext sparkContext)
Clients to implement
|
void |
setCachingStrategy(CachingStrategy cs)
Set the caching strategy to use for this job
|
void |
setDataset(java.lang.String key,
Dataset dataset)
Set a dataset for this job to potentially make use of
|
void |
setDebug(boolean d)
Set whether to output debug info.
|
void |
setOptions(java.lang.String[] options) |
void |
shutdownJob(org.apache.log4j.WriterAppender logAppender)
Shuts down the context in use by this job and removes the supplied log
appender object (if any) from the spark logger.
|
org.apache.spark.api.java.JavaRDD<Instance> |
stringRDDToInstanceRDD(org.apache.spark.api.java.JavaRDD<java.lang.String> input,
Instances headerNoSummary,
java.lang.String csvParseOptions,
CachingStrategy strategy,
boolean enforceMaxPartitions)
Process an
RDD<String> into an RDD<Instance> |
environmentSubstitute, getAdditionalWekaPackageNames, getJobName, getJobStatus, getLog, logMessage, logMessage, logMessage, makeOptionsStr, objectRowToInstance, parseInstance, postExecution, preExecution, run, setEnvironment, setJobDescription, setJobName, setJobStatus, setLog, setStatusMessagePrefix, stackTraceToString, statusMessage, stopJob
public static final java.lang.String TRAINING_DATA
public static final java.lang.String TEST_DATA
public SparkJob(java.lang.String jobName, java.lang.String jobDescription)
jobName
- the name of this jobjobDescription
- the description of this jobpublic static java.lang.String addSubdirToPath(java.lang.String parent, java.lang.String subdirName)
parent
- the parent (may include the hdfs://host:port partsubdirName
- the name of the subdirectory to addpublic static org.apache.hadoop.conf.Configuration getFSConfigurationForPath(java.lang.String path, java.lang.String[] pathOnly)
path
- the URI or local path from which to configurepathOnly
- will hold the path-only part of the URIpublic static java.lang.String resolveLocalOrOtherFileSystemPath(java.lang.String original) throws java.io.IOException
original
- original path (either relative or absolute) on a file
systemjava.io.IOException
- if a problem occurspublic static void deleteDirectory(java.lang.String path) throws java.io.IOException
path
- the path to the directory to deletejava.io.IOException
- if the path is not a directory or a problem occurspublic static java.io.InputStream openFileForRead(java.lang.String file) throws java.io.IOException
"hdfs://host:port/<path>"
file
- the file to open for reading on either the local or HDFS file
systemjava.io.IOException
- if a problem occurspublic static java.io.OutputStream openFileForWrite(java.lang.String file) throws java.io.IOException
"hdfs://host:port/<path>"
. Note
that, on the local file system, the directory path must exist. Under HDFS,
the path is created automatically.file
- the file to write tojava.io.IOException
- if a problem occurspublic static java.io.PrintWriter openTextFileForWrite(java.lang.String file) throws java.io.IOException
protocol://host:port/<path>
."
Note that, on the local file system, the directory path must exist.file
- the file to write tojava.io.IOException
- if a problem occurspublic static boolean checkFileExists(java.lang.String file) throws java.io.IOException
file
- the file to checkjava.io.IOException
- if a problem occurspublic static long getSizeInBytesOfPath(java.lang.String path) throws java.io.IOException
path
- the path to the file/directoryjava.io.IOException
- if a problem occurspublic org.apache.spark.api.java.JavaSparkContext createSparkContextForJob(SparkJobConfig conf) throws WekaException
conf
- the configuration for the jobWekaException
- if a problem occurspublic java.util.Enumeration<Option> listOptions()
listOptions
in interface OptionHandler
public java.lang.String[] getOptions()
getOptions
in interface OptionHandler
public void setOptions(java.lang.String[] options) throws java.lang.Exception
setOptions
in interface OptionHandler
java.lang.Exception
public java.lang.String[] getBaseOptionsOnly()
public java.lang.String debugTipText()
public boolean getDebug()
public void setDebug(boolean d)
d
- true if debug info is to be output.public org.apache.log4j.WriterAppender initSparkLogAppender()
public void removeSparkLogAppender(org.apache.log4j.WriterAppender appender)
appender
- the appender to removepublic CachingStrategy getCachingStrategy()
public void setCachingStrategy(CachingStrategy cs)
cs
- the caching strategy to use for this jobpublic org.apache.spark.api.java.JavaRDD<Instance> loadInstanceObjectFile(java.lang.String path, org.apache.spark.api.java.JavaSparkContext sparkContext, CachingStrategy strategy, int minPartitions, boolean enforceMaxPartitions) throws java.io.IOException
path
- the path to the file or directory to loadsparkContext
- the context to usestrategy
- the optional caching strategy to useminPartitions
- the minimum number of partitions/slices to create (may
be <= 0
to indicate that the default should be used)enforceMaxPartitions
- if true then any max partitions specified by
the user will be enforced (this might trigger a shuffle operation)JavaRDD<Instance>
datasetjava.io.IOException
- if a problem occurspublic org.apache.spark.api.java.JavaRDD<Instance> stringRDDToInstanceRDD(org.apache.spark.api.java.JavaRDD<java.lang.String> input, Instances headerNoSummary, java.lang.String csvParseOptions, CachingStrategy strategy, boolean enforceMaxPartitions)
RDD<String>
into an RDD<Instance>
input
- the RDD<String>
inputheaderNoSummary
- the header of the data without summary attributescsvParseOptions
- the options for the CSV parserstrategy
- the optional caching strategy to useenforceMaxPartitions
- if true then any max partitions specified by
the user will be enforced (this might trigger a shuffle operation)JavaRDD<Instance>
datasetpublic org.apache.spark.api.java.JavaRDD<Instance> loadCSVFile(java.lang.String path, Instances headerNoSummary, java.lang.String csvParseOptions, org.apache.spark.api.java.JavaSparkContext sparkContext, CachingStrategy strategy, int minPartitions, boolean enforceMaxPartitions) throws java.io.IOException
path
- the path to the file or directory to loadheaderNoSummary
- the header to use (sans summary attributes)csvParseOptions
- options to the CSV parsersparkContext
- the context to usestrategy
- the optional caching strategy to useminPartitions
- the minimum number of partitions/slices to create (may
be <= 0
to indicate that the default should be used)enforceMaxPartitions
- if true then any max partitions specified by
the user will be enforced (this might trigger a shuffle operation)JavaRDD<Instance>
datasetjava.io.IOException
- if a problem occurspublic org.apache.spark.api.java.JavaRDD<Instance> loadInput(java.lang.String inputPath, Instances headerNoSummary, java.lang.String csvParseOptions, org.apache.spark.api.java.JavaSparkContext sparkContext, CachingStrategy strategy, int minPartitions, boolean enforceMaxPartitions) throws java.io.IOException
inputPath
- the path to the file or directory to loadheaderNoSummary
- the header of the data (sans summary attributes)csvParseOptions
- options to the CSV parser (used if source is CSV)sparkContext
- the context to usestrategy
- the caching strategy to useminPartitions
- the minimum number of partitions/slices to create (may
be <= 0
to indicate that the default should be used)enforceMaxPartitions
- if true then any max partitions specified by
the user will be enforced (this might trigger a shuffle operation)JavaRDD<Instance>
datasetjava.io.IOException
- if a problem occurspublic SparkJobConfig getSparkJobConfig()
public org.apache.spark.api.java.JavaSparkContext getSparkContext()
public void setDataset(java.lang.String key, Dataset dataset)
key
- the name of the datasetdataset
- the dataset itselfpublic Dataset<?> getDataset(java.lang.String key)
key
- the name of the dataset to getpublic java.util.Iterator<java.util.Map.Entry<java.lang.String,Dataset<?>>> getDatasets()
public org.apache.log4j.WriterAppender initJob(org.apache.spark.api.java.JavaSparkContext context) throws java.lang.Exception
context
- the context to use (or null to create a new context)java.lang.Exception
- if a problem occurspublic void shutdownJob(org.apache.log4j.WriterAppender logAppender)
logAppender
- the log appender to remove from the spark logger. May be
null.public abstract boolean runJobWithContext(org.apache.spark.api.java.JavaSparkContext sparkContext) throws java.io.IOException, weka.distributed.DistributedWekaException
sparkContext
- the context to usejava.io.IOException
- if a IO problem occursweka.distributed.DistributedWekaException
- if any other problem occurspublic boolean runJob() throws weka.distributed.DistributedWekaException
runJob
in class distributed.core.DistributedJob
weka.distributed.DistributedWekaException