The job manager at the core of Sahara EDP (Elastic Data Processing) was originally written to execute and monitor jobs via an Oozie server. However, the job manager should allow for alternative EDP implementations which can support new cluster types or new job execution environments.
To date, the provisioning plugins released with Sahara all support deployment of an Oozie server. Oozie was a logical choice for the early releases of Sahara EDP because it provided commonality across several plugins and allowed the rapid development of the EDP feature.
However, Oozie is built on top of Hadoop Mapreduce and not every cluster configuration can or should support it (consider a Spark cluster, for example, where Hadoop Mapreduce is not a necessary part of the install). As Sahara supports additional cluster types and gains a wider install base, it’s important for EDP to have the flexibility to run on new cluster configurations and new job execution paradigms.
The current implementation of the job_manager has hardcoded dependencies on Oozie. These dependencies should be removed and the job manager should be refactored to support the current Oozie implementation as well as new implementations. The job manager should select the appropriate implementation for EDP operations based on attributes of the cluster.
Sahara EDP requires three basic operations on a job:
Currently these operations are implemented in job_manager.py with explicit dependencies on the Oozie server and Oozie-style workflows.
To move these dependencies out of the job manager, we can define an abstract class that contains the three operations:
@six.add_metaclass(abc.ABCMeta) class JobEngine(object): @abc.abstractmethod def cancel_job(self, job_execution): pass @abc.abstractmethod def get_job_status(self, job_execution): pass @abc.abstractmethod def run_job(self, job_execution): pass
For each EDP implementation Sahara supports, a class should be derived from JobEngine that contains the details. Each implementation and its supporting files should be contained in its own subdirectory.
Logic can be added to the job manager to allocate a JobEngine based on the cluster and/or job type, and the job manager can call the appropriate method on the allocated JobEngine.
As much as possible the JobEngine classes should be concerned only with implementing the operations. They may read objects from the Sahara database as needed but modifications to the job execution object should generally be done in the job_manager.py (in some cases this may be difficult, or may require optional abstract methods to be added to the JobEngine base class).
For example, the “cancel_job” sequence should look something like this:
In this example, the job manager has no knowledge of operations in the engine, only the high level logic to orchestrate the operation and update the status.
It may be possible to support “true” plugins for EDP similar to the implementation of the provisioning plugins. In this case, the plugins would be discovered and loaded dynamically at runtime.
However, this requires much more work than a refactoring and introduction of abstract classes and is probably not realistic for the Juno release. There are several problems/questions that need to be solved:
This change should only cause one minor (optional) change to the current data model. Currently, a JobExecution object contains a string field named “oozie_job_id”. While a job id field will almost certainly still be needed for all implementations and can be probably be stored as a string, the name should change to “job_id”.
JobExecution objects also contain an “extra” field which in the case of the Oozie implementation is used to store neutron connection info for the Oozie server. Other implementations may need similar data, however since the “extra” field is stored as a JsonDictType no change should be necessary.
Testing will be done primarily through the current unit and integration tests. Tests may be added that test the selection of the job engine.