[go: up one dir, main page]

Skip to content

Commit

Permalink
IGNITE-23518 Add configuration for distributed properties default val…
Browse files Browse the repository at this point in the history
…ues - Fixes apache#11616.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
alex-plekhanov committed Oct 28, 2024
1 parent a82a3ec commit dd86f44
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,9 @@ public class IgniteConfiguration {
/** Shutdown policy for cluster. */
public ShutdownPolicy shutdown = DFLT_SHUTDOWN_POLICY;

/** Default values for distributed properties. */
private Map<String, String> distrProps;

/**
* Creates valid grid configuration with all default values.
*/
Expand Down Expand Up @@ -732,6 +735,7 @@ public IgniteConfiguration(IgniteConfiguration cfg) {
sqlCfg = cfg.getSqlConfiguration();
shutdown = cfg.getShutdownPolicy();
asyncContinuationExecutor = cfg.getAsyncContinuationExecutor();
distrProps = cfg.getDistributedPropertiesDefaultValues();
}

/**
Expand Down Expand Up @@ -3599,6 +3603,27 @@ public IgniteConfiguration setAsyncContinuationExecutor(Executor asyncContinuati
return this;
}

/**
* Gets default values for distributed properties.
*
* @return Default values for distributed properties.
*/
public Map<String, String> getDistributedPropertiesDefaultValues() {
return distrProps;
}

/**
* Sets default values for distributed properties.
*
* @param distrProps Default values for distributed properties.
* @return {@code this} for chaining.
*/
public IgniteConfiguration setDistributedPropertiesDefaultValues(Map<String, String> distrProps) {
this.distrProps = distrProps;

return this;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Objects;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedProperty;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.jetbrains.annotations.NotNull;

import static java.lang.String.format;
Expand All @@ -38,24 +40,45 @@ private DistributedConfigurationUtils() {
}

/**
* @param property Property which value should be set.
* @param value Default value.
* @param prop Property which value should be set.
* @param val Default value.
* @param log Logger.
* @param <T> Property type.
*
* @return Future for the operation.
*/
public static <T extends Serializable> void setDefaultValue(DistributedProperty<T> property, T value, IgniteLogger log) {
if (property.get() == null) {
public static <T extends Serializable> IgniteInternalFuture<Void> setDefaultValue(
DistributedProperty<T> prop,
T val,
IgniteLogger log
) {
if (prop.get() == null) {
try {
property.propagateAsync(null, value)
.listen(future -> {
if (future.error() != null)
log.error("Cannot set default value of '" + property.getName() + '\'', future.error());
});
IgniteInternalFuture<Void> fut = (IgniteInternalFuture<Void>)prop.propagateAsync(null, val);

fut.listen(future -> {
if (future.error() != null)
log.error("Cannot set default value of '" + prop.getName() + '\'', future.error());
});

return fut;
}
catch (IgniteCheckedException e) {
log.error("Cannot initiate setting default value of '" + property.getName() + '\'', e);
String errMsg = "Cannot initiate setting default value of '" + prop.getName() + '\'';

log.error(errMsg, e);

return new GridFinishedFuture<>(new IgniteCheckedException(errMsg, e));
}
}
else {
if (log.isDebugEnabled()) {
log.debug("Skip set default value for distributed property [name=" + prop.getName() +
", clusterValue=" + prop.get() + ", defaultValue=" + val + ']');
}

return new GridFinishedFuture<>();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.DistributedConfigurationUtils;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
Expand All @@ -47,7 +52,7 @@ public class DistributedConfigurationProcessor extends GridProcessorAdapter impl
private static final String DIST_CONF_PREFIX = "distrConf-";

/** Properties storage. */
private final Map<String, DistributedChangeableProperty> props = new ConcurrentHashMap<>();
private final Map<String, DistributedChangeableProperty<Serializable>> props = new ConcurrentHashMap<>();

/** Global metastorage. */
private volatile DistributedMetaStorage distributedMetastorage;
Expand All @@ -74,7 +79,7 @@ public DistributedConfigurationProcessor(GridKernalContext ctx) {
distributedMetastorage.listen(
(key) -> key.startsWith(DIST_CONF_PREFIX),
(String key, Serializable oldVal, Serializable newVal) -> {
DistributedChangeableProperty prop = props.get(toPropertyKey(key));
DistributedChangeableProperty<Serializable> prop = props.get(toPropertyKey(key));

if (prop != null)
prop.localUpdate(newVal);
Expand All @@ -94,12 +99,46 @@ public DistributedConfigurationProcessor(GridKernalContext ctx) {
//Switch to cluster wide update action and do it on already registered properties.
switchCurrentActionTo(CLUSTER_WIDE_UPDATE);

isp.getDistributedConfigurationListeners()
.forEach(DistributedConfigurationLifecycleListener::onReadyToWrite);
IgniteInternalFuture<Void> initFut = initDefaultPropertiesValues();

// Notify registered listeners only after propagation of default values.
// Can't wait for initFut in the current thread, since it can block discovery and deadlock is possible.
initFut.listen(fut -> isp.getDistributedConfigurationListeners()
.forEach(DistributedConfigurationLifecycleListener::onReadyToWrite));
}
});
}

/** Init default values for distributed properties. */
private IgniteInternalFuture<Void> initDefaultPropertiesValues() {
Map<String, String> dfltVals = ctx.config().getDistributedPropertiesDefaultValues();

if (F.isEmpty(dfltVals))
return new GridFinishedFuture<>();

GridCompoundFuture<Void, Void> compFut = new GridCompoundFuture<>() {
@Override protected boolean ignoreFailure(Throwable err) {
// Do not complete the entire compound future if any property failed.
return true;
}
};

for (Map.Entry<String, String> entry : dfltVals.entrySet()) {
DistributedChangeableProperty<Serializable> prop = props.get(entry.getKey());

if (prop == null) {
log.warning("Cannot set default value for distributed property '" + entry.getKey() +
"', property is not registered");

continue;
}

compFut.add(DistributedConfigurationUtils.setDefaultValue(prop, prop.parse(entry.getValue()), log));
}

return compFut.markInitialized();
}

/**
* Switching current action to given action and do all actions from old action to new one.
*
Expand Down Expand Up @@ -160,15 +199,12 @@ private static String toPropertyKey(String metaStorageKey) {
* @return Public properties.
*/
public List<DistributedChangeableProperty<Serializable>> properties() {
return props.values().stream()
.filter(p -> p instanceof DistributedChangeableProperty)
.map(p -> (DistributedChangeableProperty<Serializable>)p)
.collect(Collectors.toList());
return U.sealList(props.values());
}

/** {@inheritDoc} */
@Override public @Nullable <T extends Serializable> DistributedChangeableProperty<T> property(String name) {
DistributedChangeableProperty<T> p = props.get(name);
DistributedChangeableProperty<T> p = (DistributedChangeableProperty<T>)props.get(name);

if (!(p instanceof DistributedChangeableProperty))
return null;
Expand Down
Loading

0 comments on commit dd86f44

Please sign in to comment.