001/*
002 * JPPF.
003 * Copyright (C) 2005-2018 JPPF Team.
004 * http://www.jppf.org
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.jppf.client;
020
021import java.io.ByteArrayInputStream;
022import java.util.*;
023
024import org.jppf.job.*;
025import org.jppf.job.persistence.*;
026import org.jppf.management.JMXDriverConnectionWrapper;
027import org.jppf.node.protocol.*;
028import org.jppf.serialization.JPPFSerialization;
029import org.jppf.utils.StringUtils;
030import org.slf4j.*;
031
032/**
033 * Instances of this class allow monitoring and managing, on the client side, the jobs persisted in a remote driver.
034 * In particular, it allows to retrieve jobs from the driver's persistence store and either process their results if they have completed,
035 * or resubmit them vith a {@link JPPFClient}.
036 * <p>The communication with the driver is performed via JMX, thus a working JMX connection to the driver must be provided in the constructor.
037 * @author Laurent Cohen
038 * @since 6.0
039 */
040public class JPPFDriverJobPersistence {
041  /**
042   * Logger for this class.
043   */
044  private static Logger log = LoggerFactory.getLogger(JPPFDriverJobPersistence.class);
045  /**
046   * Determines whether the debug level is enabled in the log configuration, without the cost of a method call.
047   */
048  private static boolean debugEnabled = log.isDebugEnabled();
049  /**
050   * Determines whether the trace level is enabled in the log configuration, without the cost of a method call.
051   */
052  private static boolean traceEnabled = log.isTraceEnabled();
053  /**
054   * A JMX connection wrapper to a JPPF driver.
055   */
056  private final JMXDriverConnectionWrapper jmx;
057  /**
058   * A proxy to the persisted jobs manager MBean.
059   */
060  private final PersistedJobsManagerMBean persistedJobsManager;
061
062  /**
063   * Initialize this persisted job manager with the specified driver JMX connection.
064   * @param jmx a JMX connection wrapper to a JPPF driver.
065   * @throws IllegalStateException if the connection to the driver isn't working for any reason. The actual exception is set as root cause.
066   */
067  public JPPFDriverJobPersistence(final JMXDriverConnectionWrapper jmx) {
068    this.jmx = jmx;
069    try {
070      this.persistedJobsManager = this.jmx.getPersistedJobsManager();
071      if (this.persistedJobsManager == null) throw new IllegalStateException("persistedJobsManager is null");
072    } catch (final Exception e) {
073      throw new IllegalStateException(e);
074    }
075  }
076
077  /**
078   * List the persisted jobs that match the provided job selector.
079   * @param selector the selector used to filter persisted jobs, a {@code null} selector is equivalent to {@link JobSelector#ALL_JOBS}.
080   * @return a list of the uuids of the matching jobs, possibly empty if no job was found.
081   * @throws Exception if any error occurs while communicating with the driver.
082   */
083  public List<String> listJobs(final JobSelector selector) throws Exception {
084    final List<String> result = persistedJobsManager.getPersistedJobUuids(selector);
085    if (debugEnabled) log.debug("found jobs: {}", result);
086    return result;
087  }
088
089  /**
090   * Delete the persisted job with the specified uuid. This method is equivalent to calling {@link #deleteJobs(JobSelector) deleteJobs(new JobUuidSelector(uuid))}.
091   * @param uuid the UUID of the job to delete.
092   * @return {@code true} if the job was successfully deleted, {@code false}.otherwise.
093   * @throws Exception if any error occurs while communicating with the driver.
094   */
095  public boolean deleteJob(final String uuid) throws Exception {
096    final List<String> result = persistedJobsManager.deletePersistedJobs(new JobUuidSelector(uuid));
097    return (result != null) && result.contains(uuid);
098  }
099
100  /**
101   * Delete the persisted jobs that match the provided job selector.
102   * @param selector the selector used to filter persisted jobs, a {@code null} selector is equivalent to {@link JobSelector#ALL_JOBS}.
103   * @return a list of the uuids of the matching jobs that were actually deleted, possibly empty if no job was found.
104   * @throws Exception if any error occurs while communicating with the driver.
105   */
106  public List<String> deleteJobs(final JobSelector selector) throws Exception {
107    final List<String> result = persistedJobsManager.deletePersistedJobs(selector);
108    if (debugEnabled) log.debug("deleted jobs: {}", result);
109    return result;
110  }
111
112  /**
113   * Retieve and rebuild the persisted job with the specified uuid.
114   * @param uuid the UUID of the job to delete.
115   * @return a {@link JPPFJob} instance, or {@code null} if the job could not be found.
116   * @throws Exception if any error occurs while communicating with the driver.
117   */
118  public JPPFJob retrieveJob(final String uuid) throws Exception {
119    final TaskBundle header = load(uuid, PersistenceObjectType.JOB_HEADER, -1);
120    if (debugEnabled) log.debug("got job header for uuid={} : {}", uuid, header);
121    if (header == null) return null;
122    final JPPFJob job = new JPPFJob(header.getUuid());
123    job.setName(header.getName());
124    job.setSLA(header.getSLA());
125    job.setMetadata(header.getMetadata());
126    final int[][] positions = persistedJobsManager.getPersistedJobPositions(uuid);
127    if (debugEnabled) log.debug("got task positions for uuid={} : {}", uuid, StringUtils.buildString(", ", "{", "}", positions[0]));
128    if (debugEnabled) log.debug("got result positions for uuid={} : {}", uuid, StringUtils.buildString(", ", "{", "}", positions[1]));
129    for (int i=0; i<2; i++) Arrays.sort(positions[i]);
130    final List<PersistenceInfo> toLoad = new ArrayList<>(1 + positions[0].length + positions[1].length);
131    toLoad.add(new PersistenceInfoImpl(uuid, null, PersistenceObjectType.DATA_PROVIDER, -1, null));
132    for (int i=0; i<positions[0].length; i++) toLoad.add(new PersistenceInfoImpl(uuid, null, PersistenceObjectType.TASK, positions[0][i], null));
133    for (int i=0; i<positions[1].length; i++) toLoad.add(new PersistenceInfoImpl(uuid, null, PersistenceObjectType.TASK_RESULT, positions[1][i], null));
134    long requestId = -1L;
135    try {
136      requestId = persistedJobsManager.requestLoad(toLoad);
137      final DataProvider dataProvider = load(requestId, uuid, PersistenceObjectType.DATA_PROVIDER, -1);
138      if (traceEnabled) log.trace("got dataprovider for uuid={} : {}", uuid, dataProvider);
139      job.setDataProvider(dataProvider);
140      for (int i=0; i<positions[0].length; i++) {
141        final Task<?> task = load(requestId, uuid, PersistenceObjectType.TASK, positions[0][i]);
142        if (traceEnabled) log.trace(String.format("got task at position %d for uuid=%s : %s", positions[0][i], uuid, task));
143        job.add(task);
144      }
145      final List<Task<?>> results = new ArrayList<>(positions[1].length);
146      for (int i=0; i<positions[1].length; i++) {
147        final Task<?> task = load(requestId, uuid, PersistenceObjectType.TASK_RESULT, positions[1][i]);
148        if (traceEnabled) log.trace(String.format("got task result at position %d for uuid=%s : %s", positions[1][i], uuid, task));
149        results.add(task);
150      }
151      job.getResults().addResults(results);
152      if (job.unexecutedTaskCount() <= 0) job.setStatus(JobStatus.COMPLETE);
153    } finally {
154      if (requestId >= 0L) persistedJobsManager.deleteLoadRequest(requestId);
155    }
156    return job;
157  }
158
159  /**
160   * Get the description of the job with the specified uuid. This method retrieves the job's uuid, name, number of tasks, SLA and metadata.
161   * @param uuid the uuid of the job to retrieve.
162   * @return the job descirption as a {@link JPPFDistributedJob} instance.
163   * @throws Exception if any error occurs while communicating with the driver.
164   */
165  public JPPFDistributedJob getJobDescription(final String uuid) throws Exception {
166    return load(uuid, PersistenceObjectType.JOB_HEADER, -1);
167  }
168
169  /**
170   * Determines whether the job has completed and all execution results are available.
171   * @param uuid the UUID of the jonb to check.
172   * @return {@code true} if the job has completed, {@code false} otherwise.
173   * @throws Exception if any error occurs while communicating with the driver.
174   */
175  public boolean isJobComplete(final String uuid) throws Exception {
176    return persistedJobsManager.isJobComplete(uuid);
177  }
178
179  /**
180   * Load an object that is part of a job from the driver's pereistence store.
181   * @param <T> the runtime tpe of the object to retrieve.
182   * @param uuid the the job uuid.
183   * @param type the type of object to load.
184   * @param position the position of the object, if applicable.
185   * @return the loaded object.
186   * @throws Exception if any error occurs while communicating with the driver.
187   */
188  @SuppressWarnings("unchecked")
189  private <T> T load(final String uuid, final PersistenceObjectType type, final int position) throws Exception {
190    final byte[] bytes = (byte[]) persistedJobsManager.getPersistedJobObject(uuid, type, position);
191    if (bytes == null) return null;
192    if (traceEnabled) log.trace("got byte[{}]", bytes.length);
193    try (final ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
194      return (T) JPPFSerialization.Factory.getSerialization().deserialize(is);
195    }
196  }
197
198  /**
199   * Load an object that is part of a job from the driver's pereistence store.
200   * @param <T> the runtime tpe of the object to retrieve.
201   * @param requestId id of the preload request.
202   * @param uuid the the job uuid.
203   * @param type the type of object to load.
204   * @param position the position of the object, if applicable.
205   * @return the loaded object.
206   * @throws Exception if any error occurs while communicating with the driver.
207   */
208  @SuppressWarnings("unchecked")
209  private <T> T load(final long requestId, final String uuid, final PersistenceObjectType type, final int position) throws Exception {
210    final byte[] bytes = (byte[]) persistedJobsManager.getPersistedJobObject(requestId, uuid, type, position);
211    if (bytes == null) return null;
212    if (traceEnabled) log.trace("got byte[{}]", bytes.length);
213    try (final ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
214      return (T) JPPFSerialization.Factory.getSerialization().deserialize(is);
215    }
216  }
217}