diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java index 28c4c66be2..5d451dc4a1 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/Lang.java @@ -90,6 +90,7 @@ public class Lang { .withFunctionName("timeseries", TimeSeriesStream.class) .withFunctionName("tuple", TupStream.class) .withFunctionName("sql", SqlStream.class) + .withFunctionName("plist", ParallelListStream.class) // metrics .withFunctionName("min", MinMetric.class) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java index 826e94803b..b1613099f2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ListStream.java @@ -108,6 +108,10 @@ public class ListStream extends TupleStream implements Expressible { if (currentStream == null) { if (streamIndex < streams.length) { currentStream = streams[streamIndex]; + // Set the stream to null in the array of streams once its been set to the current stream. + // This will remove the reference to the stream + // and should allow it to be garbage collected once it's no longer the current stream. + //streams[streamIndex] = null; currentStream.open(); } else { HashMap map = new HashMap(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java new file mode 100644 index 0000000000..ef02ffad37 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelListStream.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrjNamedThreadFactory; + +public class ParallelListStream extends TupleStream implements Expressible { + + private static final long serialVersionUID = 1; + private TupleStream[] streams; + private TupleStream currentStream; + private int streamIndex; + + public ParallelListStream(TupleStream... streams) throws IOException { + init(streams); + } + + public ParallelListStream(StreamExpression expression, StreamFactory factory) throws IOException { + List streamExpressions = factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, TupleStream.class); + TupleStream[] streams = new TupleStream[streamExpressions.size()]; + for(int idx = 0; idx < streamExpressions.size(); ++idx){ + streams[idx] = factory.constructStream(streamExpressions.get(idx)); + } + + init(streams); + } + + private void init(TupleStream ... tupleStreams) { + this.streams = tupleStreams; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException{ + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + if(includeStreams) { + for(TupleStream stream : streams) { + expression.addParameter(((Expressible)stream).toExpression(factory)); + } + } + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.STREAM_DECORATOR); + explanation.setExpression(toExpression(factory, false).toString()); + for(TupleStream stream : streams) { + explanation.addChild(stream.toExplanation(factory)); + } + + return explanation; + } + + public void setStreamContext(StreamContext context) { + for(TupleStream stream : streams) { + stream.setStreamContext(context); + } + } + + public List children() { + List l = new ArrayList(); + for(TupleStream stream : streams) { + l.add(stream); + } + return l; + } + + public Tuple read() throws IOException { + while(true) { + if (currentStream == null) { + if (streamIndex < streams.length) { + currentStream = streams[streamIndex]; + } else { + HashMap map = new HashMap(); + map.put("EOF", true); + return new Tuple(map); + } + } + + Tuple tuple = currentStream.read(); + if (tuple.EOF) { + currentStream.close(); + currentStream = null; + ++streamIndex; + } else { + return tuple; + } + } + } + + public void close() throws IOException { + } + + public void open() throws IOException { + openStreams(); + } + + private void openStreams() throws IOException { + ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("ParallelListStream")); + try { + List> futures = new ArrayList(); + int i=0; + for (TupleStream tupleStream : streams) { + StreamOpener so = new StreamOpener(new StreamIndex(tupleStream, i++)); + Future future = service.submit(so); + futures.add(future); + } + + try { + for (Future f : futures) { + StreamIndex streamIndex = f.get(); + this.streams[streamIndex.getIndex()] = streamIndex.getTupleStream(); + } + } catch (Exception e) { + throw new IOException(e); + } + } finally { + service.shutdown(); + } + } + + protected class StreamOpener implements Callable { + + private StreamIndex streamIndex; + + public StreamOpener(StreamIndex streamIndex) { + this.streamIndex = streamIndex; + } + + public StreamIndex call() throws Exception { + streamIndex.getTupleStream().open(); + return streamIndex; + } + } + + protected class StreamIndex { + private TupleStream tupleStream; + private int index; + + public StreamIndex(TupleStream tupleStream, int index) { + this.tupleStream = tupleStream; + this.index = index; + } + + public int getIndex() { + return this.index; + } + + public TupleStream getTupleStream() { + return this.tupleStream; + } + } + + /** Return the stream sort - ie, the order in which records are returned */ + public StreamComparator getStreamSort(){ + return null; + } + + public int getCost() { + return 0; + } + + +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java index c87dc24f39..fde82988a2 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java @@ -51,6 +51,7 @@ public class TupStream extends TupleStream implements Expressible { private Map streamParams = new HashMap<>(); private List fieldNames = new ArrayList(); private Map fieldLabels = new HashMap(); + private Tuple tup = null; private boolean finished; @@ -151,50 +152,6 @@ public class TupStream extends TupleStream implements Expressible { return new Tuple(m); } else { finished = true; - Map values = new HashMap<>(); - - // add all string based params - // these could come from the context, or they will just be treated as straight strings - for(Entry param : stringParams.entrySet()){ - if(streamContext.getLets().containsKey(param.getValue())){ - values.put(param.getKey(), streamContext.getLets().get(param.getValue())); - } - else{ - values.put(param.getKey(), param.getValue()); - } - } - - // add all evaluators - for(Entry param : evaluatorParams.entrySet()){ - values.put(param.getKey(), param.getValue().evaluateOverContext()); - } - - // Add all streams - for(Entry param : streamParams.entrySet()){ - - try{ - List streamTuples = new ArrayList(); - // open the stream, closed in finally block - param.getValue().open(); - - // read all values from stream (memory expensive) - Tuple streamTuple = param.getValue().read(); - while(!streamTuple.EOF){ - streamTuples.add(streamTuple); - streamTuple = param.getValue().read(); - } - - values.put(param.getKey(), streamTuples); - } - finally{ - // safely close the stream - param.getValue().close(); - } - } - - Tuple tup = new Tuple(values); - tup.fieldNames = fieldNames; - tup.fieldLabels = fieldLabels; return tup; } } @@ -204,6 +161,50 @@ public class TupStream extends TupleStream implements Expressible { } public void open() throws IOException { + Map values = new HashMap<>(); + + // add all string based params + // these could come from the context, or they will just be treated as straight strings + for(Entry param : stringParams.entrySet()){ + if(streamContext.getLets().containsKey(param.getValue())){ + values.put(param.getKey(), streamContext.getLets().get(param.getValue())); + } + else{ + values.put(param.getKey(), param.getValue()); + } + } + + // add all evaluators + for(Entry param : evaluatorParams.entrySet()){ + values.put(param.getKey(), param.getValue().evaluateOverContext()); + } + + // Add all streams + for(Entry param : streamParams.entrySet()){ + + try{ + List streamTuples = new ArrayList(); + // open the stream, closed in finally block + param.getValue().open(); + + // read all values from stream (memory expensive) + Tuple streamTuple = param.getValue().read(); + while(!streamTuple.EOF){ + streamTuples.add(streamTuple); + streamTuple = param.getValue().read(); + } + + values.put(param.getKey(), streamTuples); + } + finally{ + // safely close the stream + param.getValue().close(); + } + } + + this.tup = new Tuple(values); + tup.fieldNames = fieldNames; + tup.fieldLabels = fieldLabels; // nothing to do here } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java index deb4522760..20fa6d2fff 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/MathExpressionTest.java @@ -235,7 +235,7 @@ public class MathExpressionTest extends SolrCloudTestCase { @Test public void testMemsetSize() throws Exception { String expr = "let(echo=\"b, c\"," + - " a=memset(list(tuple(field1=val(1), field2=val(10)), tuple(field1=val(2), field2=val(20))), " + + " a=memset(plist(tuple(field1=val(1), field2=val(10)), tuple(field1=val(2), field2=val(20))), " + " cols=\"field1, field2\", " + " vars=\"f1, f2\"," + " size=1)," + @@ -1844,7 +1844,7 @@ public class MathExpressionTest extends SolrCloudTestCase { //Test exclude. This should drop off the term jim cexpr = "let(echo=true," + - " a=select(list(tuple(id=\"1\", text=\"hello world\"), " + + " a=select(plist(tuple(id=\"1\", text=\"hello world\"), " + " tuple(id=\"2\", text=\"hello steve\"), " + " tuple(id=\"3\", text=\"hello jim jim\"), " + " tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," + @@ -1916,7 +1916,7 @@ public class MathExpressionTest extends SolrCloudTestCase { //Test minDocFreq attribute at .5. This should eliminate all but the term hello cexpr = "let(echo=true," + - "a=select(list(tuple(id=\"1\", text=\"hello world\"), " + + "a=select(plist(tuple(id=\"1\", text=\"hello world\"), " + "tuple(id=\"2\", text=\"hello steve\"), " + "tuple(id=\"3\", text=\"hello jim jim\"), " + "tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," + @@ -1970,7 +1970,7 @@ public class MathExpressionTest extends SolrCloudTestCase { //Test maxDocFreq attribute at 0. This should eliminate all terms cexpr = "let(echo=true," + - "a=select(list(tuple(id=\"1\", text=\"hello world\"), " + + "a=select(plist(tuple(id=\"1\", text=\"hello world\"), " + "tuple(id=\"2\", text=\"hello steve\"), " + "tuple(id=\"3\", text=\"hello jim jim\"), " + "tuple(id=\"4\", text=\"hello jack\")), id, analyze(text, test_t) as terms)," +