diff --git a/pom.xml b/pom.xml index ba08baeea5..04743e4f93 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,8 @@ 1.5.4 3.0.0 4.13.2 + org.fusesource.leveldbjni + 0.12 1.8 1.0.0 3.1.0 @@ -810,10 +812,15 @@ ${bouncycastle.version} - org.fusesource.leveldbjni + ${leveldbjni.group} leveldbjni-all ${leveldbjni-all.version} + + org.iq80.leveldb + leveldb + ${leveldb.version} + org.xerial.snappy snappy-java @@ -1358,6 +1365,18 @@ + + aarch64 + + org.openlabtesting.leveldbjni + + + + linux + aarch64 + + + diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml index 61535adc53..d08433f99f 100644 --- a/tez-plugins/tez-aux-services/pom.xml +++ b/tez-plugins/tez-aux-services/pom.xml @@ -49,6 +49,12 @@ org.apache.hadoop hadoop-yarn-server-common provided + + + org.fusesource.leveldbjni + leveldbjni-all + + org.apache.hadoop @@ -64,6 +70,10 @@ io.netty * + + org.fusesource.leveldbjni + leveldbjni-all + @@ -90,6 +100,12 @@ org.apache.hadoop hadoop-mapreduce-client-shuffle provided + + + org.fusesource.leveldbjni + leveldbjni-all + + org.apache.tez @@ -110,10 +126,14 @@ provided - org.fusesource.leveldbjni + ${leveldbjni.group} leveldbjni-all provided + + org.iq80.leveldb + leveldb + io.netty netty-all @@ -141,6 +161,10 @@ io.netty * + + org.fusesource.leveldbjni + leveldbjni-all + @@ -154,6 +178,12 @@ hadoop-yarn-server-tests test test-jar + + + org.fusesource.leveldbjni + leveldbjni-all + + org.apache.hadoop diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 2144f62dbe..c7b830e8b2 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -29,8 +29,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.fusesource.leveldbjni.JniDBFactory.asString; -import static org.fusesource.leveldbjni.JniDBFactory.bytes; +import static java.nio.charset.StandardCharsets.UTF_8; import java.io.File; import java.io.FileNotFoundException; @@ -89,7 +88,6 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; -import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.library.common.Constants; @@ -149,10 +147,10 @@ import io.netty.util.CharsetUtil; import io.netty.util.concurrent.GlobalEventExecutor; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBException; +import org.iq80.leveldb.DBFactory; +import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Logger; import org.iq80.leveldb.Options; import org.slf4j.LoggerFactory; @@ -210,6 +208,7 @@ public class ShuffleHandler extends AuxiliaryService { private JobTokenSecretManager secretManager; private DB stateDb = null; + private static final DBFactory DB_FACTORY = createDBFactory(); public static final String TEZ_SHUFFLE_SERVICEID = "tez_shuffle"; @@ -643,6 +642,7 @@ protected void serviceStop() throws Exception { destroyPipeline(); if (stateDb != null) { stateDb.close(); + stateDb = null; } super.serviceStop(); } @@ -671,9 +671,9 @@ private void recoverState(Configuration conf) throws IOException { if (recoveryRoot != null) { startStore(recoveryRoot); Pattern jobPattern = Pattern.compile(JobID.JOBID_REGEX); - LeveldbIterator iter = null; + DBIterator iter = null; try { - iter = new LeveldbIterator(stateDb); + iter = stateDb.iterator(); iter.seek(bytes(JobID.JOB)); while (iter.hasNext()) { Map.Entry entry = iter.next(); @@ -695,26 +695,23 @@ private void recoverState(Configuration conf) throws IOException { private void startStore(Path recoveryRoot) throws IOException { Options options = new Options(); - options.createIfMissing(false); options.logger(new LevelDBLogger()); Path dbPath = new Path(recoveryRoot, STATE_DB_NAME); LOG.info("Using state database at " + dbPath + " for recovery"); File dbfile = new File(dbPath.toString()); + boolean dbExists = dbfile.exists(); + options.createIfMissing(!dbExists); try { - stateDb = JniDBFactory.factory.open(dbfile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating state database at " + dbfile); - options.createIfMissing(true); - try { - stateDb = JniDBFactory.factory.open(dbfile, options); - storeVersion(); - } catch (DBException dbExc) { - throw new IOException("Unable to create state store", dbExc); - } - } else { - throw e; + stateDb = DB_FACTORY.open(dbfile, options); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; } + throw new IOException("Unable to open state store", e); + } + if (!dbExists) { + LOG.info("Created state database at " + dbfile); + storeVersion(); } checkVersion(); } @@ -843,6 +840,27 @@ private void removeJobShuffleInfo(JobID jobId) throws IOException { } } + private static DBFactory createDBFactory() { + // Try native JNI LevelDB first (fastest), fall back to pure-Java iq80 + try { + DBFactory jniFactory = org.fusesource.leveldbjni.JniDBFactory.factory; + LOG.info("Using JNI LevelDB factory"); + return jniFactory; + } catch (Throwable t) { + LOG.info("Native LevelDB JNI library not available ({}), " + + "falling back to pure-Java implementation", t.getMessage()); + return new org.iq80.leveldb.impl.Iq80DBFactory(); + } + } + + private static byte[] bytes(String value) { + return value.getBytes(UTF_8); + } + + private static String asString(byte[] value) { + return new String(value, UTF_8); + } + private static class LevelDBLogger implements Logger { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);