diff --git a/pom.xml b/pom.xml
index ba08baeea5..04743e4f93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,8 @@
1.5.4
3.0.0
4.13.2
+ org.fusesource.leveldbjni
+ 0.12
1.8
1.0.0
3.1.0
@@ -810,10 +812,15 @@
${bouncycastle.version}
- org.fusesource.leveldbjni
+ ${leveldbjni.group}
leveldbjni-all
${leveldbjni-all.version}
+
+ org.iq80.leveldb
+ leveldb
+ ${leveldb.version}
+
org.xerial.snappy
snappy-java
@@ -1358,6 +1365,18 @@
+
+ aarch64
+
+ org.openlabtesting.leveldbjni
+
+
+
+ linux
+ aarch64
+
+
+
diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml
index 61535adc53..d08433f99f 100644
--- a/tez-plugins/tez-aux-services/pom.xml
+++ b/tez-plugins/tez-aux-services/pom.xml
@@ -49,6 +49,12 @@
org.apache.hadoop
hadoop-yarn-server-common
provided
+
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+
+
org.apache.hadoop
@@ -64,6 +70,10 @@
io.netty
*
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+
@@ -90,6 +100,12 @@
org.apache.hadoop
hadoop-mapreduce-client-shuffle
provided
+
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+
+
org.apache.tez
@@ -110,10 +126,14 @@
provided
- org.fusesource.leveldbjni
+ ${leveldbjni.group}
leveldbjni-all
provided
+
+ org.iq80.leveldb
+ leveldb
+
io.netty
netty-all
@@ -141,6 +161,10 @@
io.netty
*
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+
@@ -154,6 +178,12 @@
hadoop-yarn-server-tests
test
test-jar
+
+
+ org.fusesource.leveldbjni
+ leveldbjni-all
+
+
org.apache.hadoop
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 2144f62dbe..c7b830e8b2 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -29,8 +29,7 @@
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.fusesource.leveldbjni.JniDBFactory.asString;
-import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.File;
import java.io.FileNotFoundException;
@@ -89,7 +88,6 @@
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
-import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.Constants;
@@ -149,10 +147,10 @@
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.DBFactory;
+import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
import org.slf4j.LoggerFactory;
@@ -210,6 +208,7 @@ public class ShuffleHandler extends AuxiliaryService {
private JobTokenSecretManager secretManager;
private DB stateDb = null;
+ private static final DBFactory DB_FACTORY = createDBFactory();
public static final String TEZ_SHUFFLE_SERVICEID =
"tez_shuffle";
@@ -643,6 +642,7 @@ protected void serviceStop() throws Exception {
destroyPipeline();
if (stateDb != null) {
stateDb.close();
+ stateDb = null;
}
super.serviceStop();
}
@@ -671,9 +671,9 @@ private void recoverState(Configuration conf) throws IOException {
if (recoveryRoot != null) {
startStore(recoveryRoot);
Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX);
- LeveldbIterator iter = null;
+ DBIterator iter = null;
try {
- iter = new LeveldbIterator(stateDb);
+ iter = stateDb.iterator();
iter.seek(bytes(JobID.JOB));
while (iter.hasNext()) {
Map.Entry entry = iter.next();
@@ -695,26 +695,23 @@ private void recoverState(Configuration conf) throws IOException {
private void startStore(Path recoveryRoot) throws IOException {
Options options = new Options();
- options.createIfMissing(false);
options.logger(new LevelDBLogger());
Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
LOG.info("Using state database at " + dbPath + " for recovery");
File dbfile = new File(dbPath.toString());
+ boolean dbExists = dbfile.exists();
+ options.createIfMissing(!dbExists);
try {
- stateDb = JniDBFactory.factory.open(dbfile, options);
- } catch (NativeDB.DBException e) {
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- LOG.info("Creating state database at " + dbfile);
- options.createIfMissing(true);
- try {
- stateDb = JniDBFactory.factory.open(dbfile, options);
- storeVersion();
- } catch (DBException dbExc) {
- throw new IOException("Unable to create state store", dbExc);
- }
- } else {
- throw e;
+ stateDb = DB_FACTORY.open(dbfile, options);
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
}
+ throw new IOException("Unable to open state store", e);
+ }
+ if (!dbExists) {
+ LOG.info("Created state database at " + dbfile);
+ storeVersion();
}
checkVersion();
}
@@ -843,6 +840,27 @@ private void removeJobShuffleInfo(JobID jobId) throws IOException {
}
}
+ private static DBFactory createDBFactory() {
+ // Try native JNI LevelDB first (fastest), fall back to pure-Java iq80
+ try {
+ DBFactory jniFactory = org.fusesource.leveldbjni.JniDBFactory.factory;
+ LOG.info("Using JNI LevelDB factory");
+ return jniFactory;
+ } catch (Throwable t) {
+ LOG.info("Native LevelDB JNI library not available ({}), "
+ + "falling back to pure-Java implementation", t.getMessage());
+ return new org.iq80.leveldb.impl.Iq80DBFactory();
+ }
+ }
+
+ private static byte[] bytes(String value) {
+ return value.getBytes(UTF_8);
+ }
+
+ private static String asString(byte[] value) {
+ return new String(value, UTF_8);
+ }
+
private static class LevelDBLogger implements Logger {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);