View Javadoc
1   /*
2    * Licensed to The Apereo Foundation under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional
4    * information regarding copyright ownership.
5    *
6    *
7    * The Apereo Foundation licenses this file to you under the Educational
8    * Community License, Version 2.0 (the "License"); you may not use this file
9    * except in compliance with the License. You may obtain a copy of the License
10   * at:
11   *
12   *   http://opensource.org/licenses/ecl2.txt
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
17   * License for the specific language governing permissions and limitations under
18   * the License.
19   *
20   */
21  package org.opencastproject.assetmanager.impl.query;
22  
23  import static com.entwinemedia.fn.Stream.$;
24  
25  import org.opencastproject.assetmanager.api.Property;
26  import org.opencastproject.assetmanager.api.Snapshot;
27  import org.opencastproject.assetmanager.api.query.ARecord;
28  import org.opencastproject.assetmanager.api.query.AResult;
29  import org.opencastproject.assetmanager.api.query.ASelectQuery;
30  import org.opencastproject.assetmanager.api.query.Order;
31  import org.opencastproject.assetmanager.api.query.Predicate;
32  import org.opencastproject.assetmanager.impl.AssetManagerImpl;
33  import org.opencastproject.assetmanager.impl.RuntimeTypes;
34  import org.opencastproject.assetmanager.impl.persistence.EntityPaths;
35  import org.opencastproject.assetmanager.impl.persistence.PropertyDto;
36  import org.opencastproject.assetmanager.impl.persistence.QPropertyDto;
37  import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
38  import org.opencastproject.util.RequireUtil;
39  import org.opencastproject.util.data.Function;
40  
41  import com.entwinemedia.fn.Fn;
42  import com.entwinemedia.fn.Fn2;
43  import com.entwinemedia.fn.Stream;
44  import com.entwinemedia.fn.data.Opt;
45  import com.entwinemedia.fn.data.SetB;
46  import com.entwinemedia.fn.fns.Booleans;
47  import com.mysema.query.Tuple;
48  import com.mysema.query.jpa.impl.JPAQuery;
49  import com.mysema.query.jpa.impl.JPAQueryFactory;
50  import com.mysema.query.types.EntityPath;
51  import com.mysema.query.types.Expression;
52  import com.mysema.query.types.OrderSpecifier;
53  import com.mysema.query.types.expr.BooleanExpression;
54  
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  import java.util.ArrayList;
59  import java.util.HashMap;
60  import java.util.LinkedHashSet;
61  import java.util.List;
62  import java.util.Map;
63  import java.util.Optional;
64  import java.util.Set;
65  import java.util.stream.Collectors;
66  
67  public abstract class AbstractASelectQuery implements ASelectQuery, SelectQueryContributor, EntityPaths {
68    protected static final Logger logger = LoggerFactory.getLogger(AbstractASelectQuery.class);
69  
70    private final AbstractASelectQuery self = this;
71    private final AssetManagerImpl am;
72  
73    public AbstractASelectQuery(AssetManagerImpl am) {
74      this.am = am;
75    }
76  
77    @Override public ASelectQuery where(final Predicate predicate) {
78      return new AbstractASelectQuery(am) {
79        @Override public SelectQueryContribution contributeSelect(JPAQueryFactory f) {
80          final SelectQueryContribution predicateContrib = RuntimeTypes.convert(predicate).contributeSelect(f);
81          return self.contributeSelect(f)
82                  .addFrom(predicateContrib.from)
83                  .addJoin(predicateContrib.join)
84                  .andWhere(predicateContrib.where);
85        }
86  
87        @Override public String toString() {
88          return "where " + predicate;
89        }
90      };
91    }
92  
93    @Override public ASelectQuery page(final int offset, final int size) {
94      return new AbstractASelectQuery(am) {
95        @Override public SelectQueryContribution contributeSelect(JPAQueryFactory f) {
96          return self.contributeSelect(f).offset(offset).limit(size);
97        }
98      };
99    }
100 
101   @Override public ASelectQuery orderBy(final Order order) {
102     return new AbstractASelectQuery(am) {
103       @Override public SelectQueryContribution contributeSelect(JPAQueryFactory f) {
104         final SelectQueryContribution orderContrib = RuntimeTypes.convert(order).contributeSelect(f);
105         return self.contributeSelect(f).addOrder(orderContrib.order).andWhere(orderContrib.where);
106       }
107     };
108   }
109 
110   @Override public AResult run() {
111     return am.getDatabase().run(new Function<JPAQueryFactory, AResult>() {
112       @Override public AResult apply(JPAQueryFactory f) {
113         return run(f);
114       }
115     });
116   }
117 
118   private AResult run(JPAQueryFactory f) {
119     // run query and map the result to records
120     final long startTime = System.nanoTime();
121     // resolve AST
122     final SelectQueryContribution r = contributeSelect(f);
123     final boolean toFetchProperties = r.fetch.exists(Booleans.<Expression<?>>eq(QPropertyDto.propertyDto));
124     // # create Querydsl query
125     final JPAQuery q = f.query();
126     // # from
127     {
128       // Make sure that the snapshotDto is always contained in the from clause because the media package ID and
129       //   the ID are always selected.
130       // Use a mutable hash set to be able to use the removeAll operation.
131       final Set<EntityPath<?>> from = Stream.<EntityPath<?>>mk(Q_SNAPSHOT)
132               .append(r.from) // all collected from clauses
133               .append(r.join.map(Join.getFrom)) // all from clauses from the joins
134               .toSet(SetB.MH);
135       // Now remove everything that will be joined. Adding them in both the from and a join
136       //   clause is not allowed.
137       from.removeAll(r.join.map(Join.getJoin).toSet());
138       q.from(JpaFns.toEntityPathArray(from));
139     }
140     // # join
141     if (!r.join.isEmpty()) {
142       // Group joins by entity and combine all "on" clauses with "or" expressions.
143       // This way there is only one join clause per distinct entity which eliminates the need to alias entities
144       //   like this `new QPropertyDto("alias")`.
145       // Entity aliasing produces many issues which seem to cause a huge rewrite of the query building mechanism
146       //   so it should be prevented at all costs.
147       final Map<EntityPath<?>, BooleanExpression> joins = r.join.foldl(
148           new HashMap<EntityPath<?>, BooleanExpression>(),
149           new Fn2<Map<EntityPath<?>, BooleanExpression>, Join, Map<EntityPath<?>, BooleanExpression>>() {
150             @Override
151             public Map<EntityPath<?>, BooleanExpression> apply(Map<EntityPath<?>, BooleanExpression> sum, Join join) {
152               // get the on expression saved with the join, may be null
153               final BooleanExpression existing = sum.get(join.join);
154               final BooleanExpression combined;
155               // combine the existing and the current expression
156               if (existing == null) {
157                 combined = join.on;
158               } else if (existing.equals(join.on)) {
159                 // if both expressions are equal there is no need to combine them
160                 combined = existing;
161               } else {
162                 // if different combine with logical "or"
163                 combined = existing.or(join.on);
164               }
165               sum.put(join.join, combined);
166               return sum;
167             }
168           });
169       for (final Map.Entry<EntityPath<?>, BooleanExpression> j : joins.entrySet()) {
170         q.leftJoin(j.getKey()).on(j.getValue());
171       }
172     }
173     // # where
174     q.where(r.where.orNull());
175     // # paging
176     for (Integer a : r.offset) {
177       q.offset(a);
178     }
179     for (Integer a : r.limit) {
180       q.limit(a);
181     }
182     // # order
183     for (OrderSpecifier<?> a : r.order) {
184       q.orderBy(a);
185     }
186     // # distinct
187     if (!toFetchProperties) {
188       // if no properties shall be fetched the result set can be distinct
189       q.distinct();
190     }
191     // # fetch
192     // create parameters for fetch clause, i.e. Querydsl's list() method
193     final List<Expression<?>> fetch;
194     {
195       // check if the media package ID needs to be selected separately
196       if (r.fetch.exists(MandatoryFetch.exists)) {
197         fetch = r.fetch.toList();
198       } else {
199         fetch = r.fetch.append(MandatoryFetch.fetch).toList();
200       }
201     }
202     // Run the query and transform the result into records
203     final LinkedHashSet<ARecordImpl> records;
204     {
205       // run query
206       am.getDatabase().logQuery(q);
207       final List<Tuple> result = q.list(JpaFns.toExpressionArray(fetch));
208       logger.debug("Pure query ms " + (System.nanoTime() - startTime) / 1000000);
209       // map result based on the fact whether properties have been fetched or not
210       if (!toFetchProperties) {
211         // No properties have been fetched -> each result row (tuple) is a distinct record (snapshot).
212         records = result.stream()
213             .map(tuple -> toARecord(tuple, r))
214             .map(record -> {
215               Optional<Snapshot> snapshotOpt = record.getSnapshot();
216               Snapshot snapshot = null;
217               if (snapshotOpt.isPresent()) {
218                 // make sure the delivered media package has valid URIs
219                 snapshot = am.getHttpAssetProvider().prepareForDelivery(snapshotOpt.get());
220               }
221               return new ARecordImpl(
222                   record.getSnapshotId(),
223                   record.getMediaPackageId(),
224                   record.getProperties(),
225                   snapshot);
226             }).collect(Collectors.toCollection(LinkedHashSet::new));
227       } else {
228         logger.trace("Fetched properties");
229         // Properties have been fetched -> there may be multiple rows (tuples)
230         // per snapshot because of the join with the property table. Extract
231         // records and properties and link them together.
232 
233         // group properties after their media package ID and make sure that no duplicate properties occur
234         final Map<String, Set<Property>> propertiesPerMp = $(result).bind(toProperty).foldl(
235             new HashMap<String, Set<Property>>(),
236             new Fn2<Map<String, Set<Property>>, Property, Map<String, Set<Property>>>() {
237               @Override
238               public Map<String, Set<Property>> apply(Map<String, Set<Property>> sum, Property p) {
239                 final String mpId = p.getId().getMediaPackageId();
240                 final Set<Property> props = sum.get(mpId);
241                 if (props != null) {
242                   props.add(p);
243                 } else {
244                   sum.put(mpId, SetB.MH.mk(p));
245                 }
246                 return sum;
247               }
248             });
249         // group records after their media package ID
250         final Map<String, List<ARecordImpl>> distinctRecords = result.stream()
251             .map(tuple -> toARecord(tuple, r))
252             .collect(Collectors.groupingBy(record -> record.getMediaPackageId()));
253         records = distinctRecords.values().stream()
254             .flatMap(List::stream)
255             .map(record -> {
256               final Set<Property> properties = propertiesPerMp.get(record.getMediaPackageId());
257               final List<Property> p = properties != null
258                   ? properties.stream().collect(Collectors.toList()) : new ArrayList<>();
259               Snapshot snapshot = null;
260               Optional<Snapshot> snapshotOpt = record.getSnapshot();
261               if (snapshotOpt.isPresent()) {
262                 // make sure the delivered media package has valid URIs
263                 snapshot = am.getHttpAssetProvider().prepareForDelivery(snapshotOpt.get());
264               }
265               return new ARecordImpl(record.getSnapshotId(), record.getMediaPackageId(), p, snapshot);
266             })
267             .collect(Collectors.toCollection(LinkedHashSet::new));
268       }
269     }
270     final long searchTime = (System.nanoTime() - startTime) / 1000000;
271     logger.debug("Complete query ms " + searchTime);
272     LinkedHashSet<ARecord> narrowRecords = new LinkedHashSet<>();
273     for (ARecordImpl recordImpl : records) {
274       narrowRecords.add(recordImpl);
275     }
276     return new AResultImpl(
277         narrowRecords,
278         narrowRecords.size(),
279         r.offset.getOr(0),
280         r.limit.getOr(-1),
281         searchTime
282     );
283   }
284 
285   /**
286    * Transform a Querydsl result {@link Tuple} into an {@link ARecord}.
287    * To do the transformation I need to know what targets have been selected.
288    */
289   private Fn<Tuple, ARecordImpl> toARecord(final SelectQueryContribution c) {
290     return new Fn<Tuple, ARecordImpl>() {
291       @Override public ARecordImpl apply(Tuple tuple) {
292         final String mediaPackageId;
293         SnapshotDto snapshotDto = null;
294         final long id;
295         // Only fetch the snapshot if it is in the fetch list.
296         if (c.fetch.exists(Booleans.<Expression<?>>eq(Q_SNAPSHOT))) {
297           snapshotDto = RequireUtil.notNull(tuple.get(Q_SNAPSHOT), "[BUG] snapshot table data");
298           id = snapshotDto.getId();
299           mediaPackageId = snapshotDto.getMediaPackageId();
300         } else {
301           // The media package ID and the snapshot's database ID must always be fetched.
302           id = RequireUtil.notNull(tuple.get(Q_SNAPSHOT.id), "[BUG] snapshot table id");
303           mediaPackageId = RequireUtil.notNull(
304               tuple.get(Q_SNAPSHOT.mediaPackageId),
305               "[BUG] snapshot table media package id"
306           );
307         }
308         return new ARecordImpl(id, mediaPackageId, new ArrayList<>(), snapshotDto);
309       }
310     };
311   }
312 
313   private ARecordImpl toARecord(Tuple tuple, final SelectQueryContribution c) {
314     final String mediaPackageId;
315     SnapshotDto snapshotDto = null;
316     final long id;
317     // Only fetch the snapshot if it is in the fetch list.
318     if (c.fetch.exists(Booleans.<Expression<?>>eq(Q_SNAPSHOT))) {
319       snapshotDto = RequireUtil.notNull(tuple.get(Q_SNAPSHOT), "[BUG] snapshot table data");
320       id = snapshotDto.getId();
321       mediaPackageId = snapshotDto.getMediaPackageId();
322     } else {
323       // The media package ID and the snapshot's database ID must always be fetched.
324       id = RequireUtil.notNull(tuple.get(Q_SNAPSHOT.id), "[BUG] snapshot table id");
325       mediaPackageId = RequireUtil.notNull(
326           tuple.get(Q_SNAPSHOT.mediaPackageId),
327           "[BUG] snapshot table media package id"
328       );
329     }
330     return new ARecordImpl(id, mediaPackageId, new ArrayList<>(), snapshotDto);
331   }
332 
333   private static Fn<Tuple, Opt<Property>> toProperty = new Fn<Tuple, Opt<Property>>() {
334     @Override public Opt<Property> apply(Tuple tuple) {
335       final PropertyDto dto = tuple.get(Q_PROPERTY);
336       return dto != null ? Opt.some(dto.toProperty()) : Opt.<Property>none();
337     }
338   };
339 
340   /**
341    * Specification of fields whose fetch is mandatory.
342    */
343   private static final class MandatoryFetch {
344     static final Fn<Expression<?>, Boolean> exists =
345             Booleans.<Expression<?>>eq(Q_SNAPSHOT)
346                     .or(Booleans.<Expression<?>>eq(Q_SNAPSHOT.mediaPackageId))
347                     .or(Booleans.<Expression<?>>eq(Q_SNAPSHOT.id));
348 
349     static final Stream<Expression<?>> fetch = Stream.<Expression<?>>mk(Q_SNAPSHOT.mediaPackageId, Q_SNAPSHOT.id);
350   }
351 
352   private static <A> Stream<A> vary(Stream<? extends A> a) {
353     return (Stream<A>) a;
354   }
355 
356   private static <A> int sizeOf(Stream<A> stream) {
357     int count = 0;
358     for (A ignore : stream) {
359       count++;
360     }
361     return count;
362   }
363 }