Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/main/java/org/fisco/bcos/sdk/v3/client/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public class ClientImpl implements Client {
private GroupNodeIniConfig groupNodeIniConfig;
private CryptoSuite cryptoSuite;
private RpcJniObj rpcJniObj;
private boolean started;
private boolean stopped;
private boolean destroyed;

protected final ObjectMapper objectMapper = getObjectMapper();

Expand Down Expand Up @@ -1594,21 +1597,27 @@ public void getFilterLogsAsync(LogFilterResponse filter, RespCallback<LogWrapper
}

@Override
public void start() {
if (rpcJniObj != null) {
public synchronized void start() {
if (!destroyed && rpcJniObj != null && (!started || stopped)) {
rpcJniObj.start();
started = true;
stopped = false;
}
}

@Override
public void stop() {
if (rpcJniObj != null) {
public synchronized void stop() {
if (!destroyed && started && !stopped && rpcJniObj != null) {
rpcJniObj.stop();
stopped = true;
}
}

@Override
public void destroy() {
public synchronized void destroy() {
if (destroyed) {
return;
}
if (rpcJniObj != null) {
BcosSDKJniObj.destroy(rpcJniObj.getNativePointer());
rpcJniObj = null;
Expand All @@ -1617,6 +1626,9 @@ public void destroy() {
cryptoSuite.destroy();
cryptoSuite = null;
}
started = false;
stopped = true;
destroyed = true;
}

public static <T extends JsonRpcResponse<?>> ResponseCallback createResponseCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface EventSubscribe {
*/
static EventSubscribe build(String group, ConfigOption configOption) throws JniException {
Client client = Client.build(group, configOption);
return new EventSubscribeImp(client, configOption);
return new EventSubscribeImp(client, configOption, true);
}

/**
Expand Down
67 changes: 57 additions & 10 deletions src/main/java/org/fisco/bcos/sdk/v3/eventsub/EventSubscribeImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.math.BigInteger;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.fisco.bcos.sdk.jni.BcosSDKJniObj;
import org.fisco.bcos.sdk.jni.common.JniException;
import org.fisco.bcos.sdk.jni.event.EventSubJniObj;
import org.fisco.bcos.sdk.v3.client.Client;
Expand All @@ -37,14 +38,25 @@ public class EventSubscribeImp implements EventSubscribe {
private String groupId;
private ConfigOption configOption;
private CryptoSuite cryptoSuite;
private final Client ownerClient;
private final boolean ownsClient;
private EventSubJniObj eventSubJniObj;
private boolean stopped;
private boolean destroyed;

private final ObjectMapper objectMapper = ObjectMapperFactory.getObjectMapper();

public EventSubscribeImp(Client client, ConfigOption configOption) throws JniException {
this(client, configOption, false);
}

EventSubscribeImp(Client client, ConfigOption configOption, boolean ownsClient)
throws JniException {
this.groupId = client.getGroup();
this.configOption = configOption;
this.cryptoSuite = client.getCryptoSuite();
this.ownerClient = client;
this.ownsClient = ownsClient;
this.eventSubJniObj = EventSubJniObj.build(client.getNativePointer());
this.configOption = client.getConfigOption();

Expand Down Expand Up @@ -186,30 +198,65 @@ public String subscribeEvent(EventSubParams params, EventSubCallback callback) {

@Override
public void unsubscribeEvent(String eventId) {
eventSubJniObj.unsubscribeEvent(eventId);
if (eventSubJniObj != null) {
eventSubJniObj.unsubscribeEvent(eventId);
}
}

@Override
public Set<String> getAllSubscribedEvents() {
// TODO: impl
return null;
if (eventSubJniObj == null) {
return Collections.emptySet();
}
Set<String> subscribedEvents = eventSubJniObj.getAllSubscribedEvents();
return subscribedEvents == null ? Collections.emptySet() : subscribedEvents;
}

@Override
public void start() {
eventSubJniObj.start();
public synchronized void start() {
if (destroyed) {
return;
}
if (ownsClient) {
ownerClient.start();
}
stopped = false;
}

@Override
public void stop() {
eventSubJniObj.stop();
public synchronized void stop() {
if (destroyed || stopped) {
return;
}
unsubscribeAllEvents();
if (ownsClient) {
ownerClient.stop();
}
stopped = true;
}

@Override
public void destroy() {
public synchronized void destroy() {
if (destroyed) {
return;
}
stop();
if (eventSubJniObj != null) {
BcosSDKJniObj.destroy(eventSubJniObj.getNativePointer());
eventSubJniObj = null;
}
if (ownsClient) {
ownerClient.destroy();
}
destroyed = true;
}

private void unsubscribeAllEvents() {
Set<String> subscribedEvents = getAllSubscribedEvents();
if (subscribedEvents.isEmpty()) {
return;
}
for (String eventId : new HashSet<>(subscribedEvents)) {
unsubscribeEvent(eventId);
}
}
}
54 changes: 54 additions & 0 deletions src/test/java/org/fisco/bcos/sdk/v3/client/ClientImplTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.fisco.bcos.sdk.v3.client;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import org.fisco.bcos.sdk.jni.rpc.RpcJniObj;
import org.fisco.bcos.sdk.v3.crypto.CryptoSuite;
import org.junit.Test;
import sun.misc.Unsafe;

public class ClientImplTest {

@Test
public void testStopAndDestroyAreIdempotent() throws Exception {
ClientImpl client = (ClientImpl) getUnsafe().allocateInstance(ClientImpl.class);
RpcJniObj rpcJniObj = mock(RpcJniObj.class);
when(rpcJniObj.getNativePointer()).thenReturn(0L);
CryptoSuite cryptoSuite = mock(CryptoSuite.class);

setField(client, "rpcJniObj", rpcJniObj);
setField(client, "cryptoSuite", cryptoSuite);
setBooleanField(client, "started", true);

client.stop();
client.stop();
client.destroy();
client.destroy();

verify(rpcJniObj, times(1)).stop();
verify(cryptoSuite, times(1)).destroy();
}

private static Unsafe getUnsafe() throws Exception {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
}

private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}

private static void setBooleanField(Object target, String fieldName, boolean value)
throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.setBoolean(target, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.fisco.bcos.sdk.v3.eventsub;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.fisco.bcos.sdk.jni.event.EventSubJniObj;
import org.fisco.bcos.sdk.v3.client.Client;
import org.junit.Test;

public class EventSubscribeImpTest {

@Test
public void testStopOnSharedClientOnlyUnsubscribesOnce() throws Exception {
Client client = mock(Client.class);
when(client.getGroup()).thenReturn("group0");
when(client.getNativePointer()).thenReturn(0L);

EventSubscribeImp eventSubscribe = new EventSubscribeImp(client, null);
EventSubJniObj eventSubJniObj = mock(EventSubJniObj.class);
when(eventSubJniObj.getAllSubscribedEvents())
.thenReturn(new HashSet<>(Arrays.asList("event-a", "event-b")));
setField(eventSubscribe, "eventSubJniObj", eventSubJniObj);

eventSubscribe.stop();
eventSubscribe.stop();

verify(eventSubJniObj, times(1)).getAllSubscribedEvents();
verify(eventSubJniObj, times(1)).unsubscribeEvent("event-a");
verify(eventSubJniObj, times(1)).unsubscribeEvent("event-b");
verify(eventSubJniObj, never()).stop();
verify(client, never()).stop();
}

@Test
public void testDestroyOnOwnedClientDelegatesLifecycleOnce() throws Exception {
Client client = mock(Client.class);
when(client.getGroup()).thenReturn("group0");
when(client.getNativePointer()).thenReturn(0L);

EventSubscribeImp eventSubscribe = new EventSubscribeImp(client, null, true);
EventSubJniObj eventSubJniObj = mock(EventSubJniObj.class);
when(eventSubJniObj.getAllSubscribedEvents())
.thenReturn(new HashSet<>(Arrays.asList("event-a")));
setField(eventSubscribe, "eventSubJniObj", eventSubJniObj);

eventSubscribe.destroy();
eventSubscribe.destroy();

verify(eventSubJniObj, times(1)).unsubscribeEvent("event-a");
verify(eventSubJniObj, never()).stop();
verify(client, times(1)).stop();
verify(client, times(1)).destroy();
}

private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}
}