Index: solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java =================================================================== --- solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (revision 1721301) +++ solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (working copy) @@ -56,7 +56,7 @@ import static org.apache.solr.common.util.Utils.fromJSON; public class ZkStateReader implements Closeable { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String BASE_URL_PROP = "base_url"; public static final String NODE_NAME_PROP = "node_name"; @@ -146,9 +146,7 @@ String configName = null; String path = COLLECTIONS_ZKNODE + "/" + collection; - if (log.isInfoEnabled()) { - log.info("Load collection config from:" + path); - } + LOG.info("Load collection config from: [{}]", path); try { byte[] data = zkClient.getData(path, null, null, true); @@ -160,12 +158,10 @@ if (configName != null) { if (!zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) { - log.error("Specified config does not exist in ZooKeeper:" + configName); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, - "Specified config does not exist in ZooKeeper:" + configName); - } else if (log.isInfoEnabled()) { - log.info("path={} {}={} specified config exists in ZooKeeper", - new Object[] {path, CONFIGNAME_PROP, configName}); + LOG.error("Specified config does not exist in ZooKeeper: [{}]", configName); + throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "Specified config does not exist in ZooKeeper: " + configName); + } else { + LOG.info("path=[{}] [{}]=[{}] specified config exists in ZooKeeper", path, CONFIGNAME_PROP, configName); } } else { throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path); @@ -183,16 +179,6 @@ } - private static class ZKTF implements ThreadFactory { - private static ThreadGroup tg = new ThreadGroup("ZkStateReader"); - @Override - public Thread newThread(Runnable r) { - Thread td = new Thread(tg, r); - td.setDaemon(true); - return td; - } - } - private final SolrZkClient zkClient; private final boolean closeClient; @@ -222,15 +208,13 @@ try { ZkStateReader.this.createClusterStateWatchersAndUpdate(); } catch (KeeperException e) { - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + LOG.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + LOG.error("Interrupted", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted", e); } } }); @@ -280,7 +264,7 @@ DocCollection collection = clusterState.getCollectionOrNull(coll); if (collection == null) return null; if (collection.getZNodeVersion() < version) { - log.debug("server older than client {}<{}", collection.getZNodeVersion(), version); + LOG.debug("server older than client {}<{}", collection.getZNodeVersion(), version); DocCollection nu = getCollectionLive(this, coll); if (nu == null) return -1 ; if (nu.getZNodeVersion() > collection.getZNodeVersion()) { @@ -293,7 +277,7 @@ return null; } - log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion()); + LOG.debug("Wrong version from client [{}]!=[{}]", version, collection.getZNodeVersion()); return collection.getZNodeVersion(); } @@ -302,7 +286,7 @@ InterruptedException { // We need to fetch the current cluster state and the set of live nodes - log.info("Updating cluster state from ZooKeeper... "); + LOG.info("Updating cluster state from ZooKeeper... "); // Sanity check ZK structure. if (!zkClient.exists(CLUSTER_STATE, true)) { @@ -331,32 +315,23 @@ } try { synchronized (ZkStateReader.this.getUpdateLock()) { - log.info("Updating aliases... "); + LOG.info("Updating aliases... "); // remake watch final Watcher thisWatch = this; - Stat stat = new Stat(); - byte[] data = zkClient.getData(ALIASES, thisWatch, stat , - true); - - Aliases aliases = ClusterState.load(data); - - ZkStateReader.this.aliases = aliases; + final Stat stat = new Stat(); + final byte[] data = zkClient.getData(ALIASES, thisWatch, stat, true); + ZkStateReader.this.aliases = ClusterState.load(data); } + } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { + LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException( - SolrException.ErrorCode.SERVER_ERROR, "", e); + LOG.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); - log.warn("", e); - return; + LOG.warn("Interrupted", e); } } @@ -393,34 +368,29 @@ } try { synchronized (ZkStateReader.this.getUpdateLock()) { - log.info("Updating {} ... ", path); + LOG.info("Updating [{}] ... ", path); // remake watch final Watcher thisWatch = this; - Stat stat = new Stat(); - byte[] data = getZkClient().getData(path, thisWatch, stat, true); + final Stat stat = new Stat(); + final byte[] data = getZkClient().getData(path, thisWatch, stat, true); try { callback.call(new Pair<>(data, stat)); } catch (Exception e) { if (e instanceof KeeperException) throw (KeeperException) e; if (e instanceof InterruptedException) throw (InterruptedException) e; - log.error("Error running collections node listener", e); + LOG.error("Error running collections node listener", e); } } + } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) { + LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException( - ErrorCode.SERVER_ERROR, "", e); + LOG.error("A ZK error has occurred", e); + throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); - log.warn("", e); - return; + LOG.warn("Interrupted", e); } } @@ -454,7 +424,7 @@ } this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion); - log.debug("clusterStateSet: version {} legacy {} interesting {} watched {} lazy {} total {}", + LOG.debug("clusterStateSet: version [{}] legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", clusterState.getZkClusterStateVersion(), legacyCollectionStates.keySet(), interestingCollections, @@ -469,9 +439,9 @@ private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException { try { - Stat stat = new Stat(); - byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); - ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE); + final Stat stat = new Stat(); + final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); + final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE); synchronized (getUpdateLock()) { this.legacyCollectionStates = loadedData.getCollectionStates(); this.legacyClusterStateVersion = stat.getVersion(); @@ -512,7 +482,7 @@ try { children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true); } catch (KeeperException.NoNodeException e) { - log.warn("Error fetching collection names"); + LOG.warn("Error fetching collection names: [{}]", e.getMessage()); // fall through } if (children == null || children.isEmpty()) { @@ -569,7 +539,7 @@ Set newLiveNodes; try { List nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true); - log.debug("Updating live nodes from ZooKeeper... ({})", nodeList.size()); + LOG.debug("Updating live nodes from ZooKeeper... [{}]", nodeList.size()); newLiveNodes = new HashSet<>(nodeList); } catch (KeeperException.NoNodeException e) { newLiveNodes = emptySet(); @@ -600,14 +570,12 @@ } } - public String getLeaderUrl(String collection, String shard, int timeout) - throws InterruptedException, KeeperException { - ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, - shard, timeout)); + public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException { + ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout)); return props.getCoreUrl(); } - public Replica getLeader(String collection, String shard) throws InterruptedException { + public Replica getLeader(String collection, String shard) { if (clusterState != null) { Replica replica = clusterState.getLeader(collection, shard); if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { @@ -714,23 +682,19 @@ } public void updateAliases() throws KeeperException, InterruptedException { - byte[] data = zkClient.getData(ALIASES, null, null, true); - - Aliases aliases = ClusterState.load(data); - - ZkStateReader.this.aliases = aliases; + final byte[] data = zkClient.getData(ALIASES, null, null, true); + this.aliases = ClusterState.load(data); } + public Map getClusterProps(){ - Map result = null; try { - if(getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)){ - result = (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ; + if (getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)) { + return (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ; } else { - result= new LinkedHashMap(); + return new LinkedHashMap(); } - return result; } catch (Exception e) { - throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading cluster properties",e) ; + throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading cluster properties", e); } } @@ -769,16 +733,12 @@ properties.put(propertyName, propertyValue); getZkClient().create(CLUSTER_PROPS, Utils.toJSON(properties), CreateMode.PERSISTENT, true); } - } catch (KeeperException.BadVersionException bve) { - log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion()); + } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) { + LOG.warn("Race condition while trying to set a new cluster prop on current version [{}]", s.getVersion()); //race condition continue; - } catch (KeeperException.NodeExistsException nee) { - log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion()); - //race condition - continue; } catch (Exception ex) { - log.error("Error updating path " + CLUSTER_PROPS, ex); + LOG.error("Error updating path [{}]", CLUSTER_PROPS, ex); throw new SolrException(ErrorCode.SERVER_ERROR, "Error updating cluster property " + propertyName, ex); } break; @@ -785,8 +745,6 @@ } } - - /** * Returns the content of /security.json from ZooKeeper as a Map * If the files doesn't exist, it returns null. @@ -799,8 +757,7 @@ try { Stat stat = new Stat(); if(getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) { - byte[] data = getZkClient() - .getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true); + final byte[] data = getZkClient().getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true); return data != null && data.length > 0 ? new ConfigData((Map) Utils.fromJSON(data), stat.getVersion()) : null; @@ -810,6 +767,7 @@ } return null; } + /** * Returns the baseURL corresponding to a given node's nodeName -- * NOTE: does not (currently) imply that the nodeName (or resulting @@ -846,7 +804,7 @@ public void process(WatchedEvent event) { if (!interestingCollections.contains(coll)) { // This collection is no longer interesting, stop watching. - log.info("Uninteresting collection {}", coll); + LOG.info("Uninteresting collection [{}]", coll); return; } @@ -856,9 +814,10 @@ return; } - log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})", - (event), coll, ZkStateReader.this.clusterState == null ? 0 - : ZkStateReader.this.clusterState.getLiveNodes().size()); + int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 + : ZkStateReader.this.clusterState.getLiveNodes().size(); + LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", + event, coll, liveNodesSize); refreshAndWatch(true); synchronized (getUpdateLock()) { @@ -879,20 +838,16 @@ updateWatchedCollection(coll, newState); } catch (KeeperException.NoNodeException e) { if (expectExists) { - log.warn("State node vanished for collection: " + coll, e); + LOG.warn("State node vanished for collection: [{}]", coll, e); } + } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { + LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("Unwatched collection: " + coll, e); - throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e); - + LOG.error("Unwatched collection: [{}]", coll, e); + throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - log.error("Unwatched collection :" + coll, e); + LOG.error("Unwatched collection: [{}]", coll, e); } } } @@ -907,7 +862,8 @@ if (EventType.None.equals(event.getType())) { return; } - log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size()); + int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size(); + LOG.info("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize); refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); @@ -921,19 +877,15 @@ } catch (KeeperException.NoNodeException e) { throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready"); + } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { + LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + LOG.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); - log.warn("", e); + LOG.warn("Interrupted", e); } } } @@ -948,7 +900,7 @@ if (EventType.None.equals(event.getType())) { return; } - log.info("A collections change: {}, has occurred - updating...", (event)); + LOG.info("A collections change: [{}], has occurred - updating...", event); refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); @@ -959,19 +911,15 @@ public void refreshAndWatch() { try { refreshCollectionList(this); + } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { + LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + LOG.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); - log.warn("", e); + LOG.warn("Interrupted", e); } } } @@ -986,7 +934,7 @@ if (EventType.None.equals(event.getType())) { return; } - log.info("A live node change: {}, has occurred - updating... (live nodes size: {})", (event), liveNodes.size()); + LOG.info("A live node change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodes.size()); refreshAndWatch(); } @@ -993,34 +941,27 @@ public void refreshAndWatch() { try { refreshLiveNodes(this); + } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { + LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { - if (e.code() == KeeperException.Code.SESSIONEXPIRED - || e.code() == KeeperException.Code.CONNECTIONLOSS) { - log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK"); - return; - } - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + LOG.error("A ZK error has occurred", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e); } catch (InterruptedException e) { // Restore the interrupted status Thread.currentThread().interrupt(); - log.warn("", e); + LOG.warn("Interrupted", e); } } } - public static DocCollection getCollectionLive(ZkStateReader zkStateReader, - String coll) { + public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) { try { return zkStateReader.fetchCollectionState(coll, null); } catch (KeeperException e) { - throw new SolrException(ErrorCode.BAD_REQUEST, - "Could not load collection from ZK:" + coll, e); + throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new SolrException(ErrorCode.BAD_REQUEST, - "Could not load collection from ZK:" + coll, e); + throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e); } } @@ -1042,9 +983,9 @@ return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; } - public void addCollectionWatch(String coll) throws KeeperException, InterruptedException { + public void addCollectionWatch(String coll) { if (interestingCollections.add(coll)) { - log.info("addZkWatch {}", coll); + LOG.info("addZkWatch [{}]", coll); new StateWatcher(coll).refreshAndWatch(false); synchronized (getUpdateLock()) { constructState(); @@ -1054,7 +995,7 @@ private void updateWatchedCollection(String coll, DocCollection newState) { if (newState == null) { - log.info("Deleting data for {}", coll); + LOG.info("Deleting data for [{}]", coll); watchedCollectionStates.remove(coll); return; } @@ -1067,7 +1008,7 @@ DocCollection oldState = watchedCollectionStates.get(coll); if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { - log.info("Add data for {} ver {} ", coll, newState.getZNodeVersion()); + LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); break; } } else { @@ -1076,7 +1017,7 @@ break; } if (watchedCollectionStates.replace(coll, oldState, newState)) { - log.info("Updating data for {} from {} to {} ", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); + LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); break; } } @@ -1085,13 +1026,13 @@ // Resolve race with removeZKWatch. if (!interestingCollections.contains(coll)) { watchedCollectionStates.remove(coll); - log.info("Removing uninteresting collection {}", coll); + LOG.info("Removing uninteresting collection [{}]", coll); } } /** This is not a public API. Only used by ZkController */ public void removeZKWatch(String coll) { - log.info("Removing watch for uninteresting collection {}", coll); + LOG.info("Removing watch for uninteresting collection [{}]", coll); interestingCollections.remove(coll); watchedCollectionStates.remove(coll); lazyCollectionStates.put(coll, new LazyCollectionRef(coll));