1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.opencastproject.assetmanager.impl.query;
22
23 import static com.entwinemedia.fn.Stream.$;
24
25 import org.opencastproject.assetmanager.api.Property;
26 import org.opencastproject.assetmanager.api.Snapshot;
27 import org.opencastproject.assetmanager.api.query.ARecord;
28 import org.opencastproject.assetmanager.api.query.AResult;
29 import org.opencastproject.assetmanager.api.query.ASelectQuery;
30 import org.opencastproject.assetmanager.api.query.Order;
31 import org.opencastproject.assetmanager.api.query.Predicate;
32 import org.opencastproject.assetmanager.impl.AssetManagerImpl;
33 import org.opencastproject.assetmanager.impl.RuntimeTypes;
34 import org.opencastproject.assetmanager.impl.persistence.EntityPaths;
35 import org.opencastproject.assetmanager.impl.persistence.PropertyDto;
36 import org.opencastproject.assetmanager.impl.persistence.QPropertyDto;
37 import org.opencastproject.assetmanager.impl.persistence.SnapshotDto;
38 import org.opencastproject.util.RequireUtil;
39 import org.opencastproject.util.data.Function;
40
41 import com.entwinemedia.fn.Fn;
42 import com.entwinemedia.fn.Fn2;
43 import com.entwinemedia.fn.Stream;
44 import com.entwinemedia.fn.data.Opt;
45 import com.entwinemedia.fn.data.SetB;
46 import com.entwinemedia.fn.fns.Booleans;
47 import com.mysema.query.Tuple;
48 import com.mysema.query.jpa.impl.JPAQuery;
49 import com.mysema.query.jpa.impl.JPAQueryFactory;
50 import com.mysema.query.types.EntityPath;
51 import com.mysema.query.types.Expression;
52 import com.mysema.query.types.OrderSpecifier;
53 import com.mysema.query.types.expr.BooleanExpression;
54
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import java.util.ArrayList;
59 import java.util.HashMap;
60 import java.util.LinkedHashSet;
61 import java.util.List;
62 import java.util.Map;
63 import java.util.Optional;
64 import java.util.Set;
65 import java.util.stream.Collectors;
66
67 public abstract class AbstractASelectQuery implements ASelectQuery, SelectQueryContributor, EntityPaths {
68 protected static final Logger logger = LoggerFactory.getLogger(AbstractASelectQuery.class);
69
70 private final AbstractASelectQuery self = this;
71 private final AssetManagerImpl am;
72
73 public AbstractASelectQuery(AssetManagerImpl am) {
74 this.am = am;
75 }
76
77 @Override public ASelectQuery where(final Predicate predicate) {
78 return new AbstractASelectQuery(am) {
79 @Override public SelectQueryContribution contributeSelect(JPAQueryFactory f) {
80 final SelectQueryContribution predicateContrib = RuntimeTypes.convert(predicate).contributeSelect(f);
81 return self.contributeSelect(f)
82 .addFrom(predicateContrib.from)
83 .addJoin(predicateContrib.join)
84 .andWhere(predicateContrib.where);
85 }
86
87 @Override public String toString() {
88 return "where " + predicate;
89 }
90 };
91 }
92
93 @Override public ASelectQuery page(final int offset, final int size) {
94 return new AbstractASelectQuery(am) {
95 @Override public SelectQueryContribution contributeSelect(JPAQueryFactory f) {
96 return self.contributeSelect(f).offset(offset).limit(size);
97 }
98 };
99 }
100
101 @Override public ASelectQuery orderBy(final Order order) {
102 return new AbstractASelectQuery(am) {
103 @Override public SelectQueryContribution contributeSelect(JPAQueryFactory f) {
104 final SelectQueryContribution orderContrib = RuntimeTypes.convert(order).contributeSelect(f);
105 return self.contributeSelect(f).addOrder(orderContrib.order).andWhere(orderContrib.where);
106 }
107 };
108 }
109
110 @Override public AResult run() {
111 return am.getDatabase().run(new Function<JPAQueryFactory, AResult>() {
112 @Override public AResult apply(JPAQueryFactory f) {
113 return run(f);
114 }
115 });
116 }
117
118 private AResult run(JPAQueryFactory f) {
119
120 final long startTime = System.nanoTime();
121
122 final SelectQueryContribution r = contributeSelect(f);
123 final boolean toFetchProperties = r.fetch.exists(Booleans.<Expression<?>>eq(QPropertyDto.propertyDto));
124
125 final JPAQuery q = f.query();
126
127 {
128
129
130
131 final Set<EntityPath<?>> from = Stream.<EntityPath<?>>mk(Q_SNAPSHOT)
132 .append(r.from)
133 .append(r.join.map(Join.getFrom))
134 .toSet(SetB.MH);
135
136
137 from.removeAll(r.join.map(Join.getJoin).toSet());
138 q.from(JpaFns.toEntityPathArray(from));
139 }
140
141 if (!r.join.isEmpty()) {
142
143
144
145
146
147 final Map<EntityPath<?>, BooleanExpression> joins = r.join.foldl(
148 new HashMap<EntityPath<?>, BooleanExpression>(),
149 new Fn2<Map<EntityPath<?>, BooleanExpression>, Join, Map<EntityPath<?>, BooleanExpression>>() {
150 @Override
151 public Map<EntityPath<?>, BooleanExpression> apply(Map<EntityPath<?>, BooleanExpression> sum, Join join) {
152
153 final BooleanExpression existing = sum.get(join.join);
154 final BooleanExpression combined;
155
156 if (existing == null) {
157 combined = join.on;
158 } else if (existing.equals(join.on)) {
159
160 combined = existing;
161 } else {
162
163 combined = existing.or(join.on);
164 }
165 sum.put(join.join, combined);
166 return sum;
167 }
168 });
169 for (final Map.Entry<EntityPath<?>, BooleanExpression> j : joins.entrySet()) {
170 q.leftJoin(j.getKey()).on(j.getValue());
171 }
172 }
173
174 q.where(r.where.orNull());
175
176 for (Integer a : r.offset) {
177 q.offset(a);
178 }
179 for (Integer a : r.limit) {
180 q.limit(a);
181 }
182
183 for (OrderSpecifier<?> a : r.order) {
184 q.orderBy(a);
185 }
186
187 if (!toFetchProperties) {
188
189 q.distinct();
190 }
191
192
193 final List<Expression<?>> fetch;
194 {
195
196 if (r.fetch.exists(MandatoryFetch.exists)) {
197 fetch = r.fetch.toList();
198 } else {
199 fetch = r.fetch.append(MandatoryFetch.fetch).toList();
200 }
201 }
202
203 final LinkedHashSet<ARecordImpl> records;
204 {
205
206 am.getDatabase().logQuery(q);
207 final List<Tuple> result = q.list(JpaFns.toExpressionArray(fetch));
208 logger.debug("Pure query ms " + (System.nanoTime() - startTime) / 1000000);
209
210 if (!toFetchProperties) {
211
212 records = result.stream()
213 .map(tuple -> toARecord(tuple, r))
214 .map(record -> {
215 Optional<Snapshot> snapshotOpt = record.getSnapshot();
216 Snapshot snapshot = null;
217 if (snapshotOpt.isPresent()) {
218
219 snapshot = am.getHttpAssetProvider().prepareForDelivery(snapshotOpt.get());
220 }
221 return new ARecordImpl(
222 record.getSnapshotId(),
223 record.getMediaPackageId(),
224 record.getProperties(),
225 snapshot);
226 }).collect(Collectors.toCollection(LinkedHashSet::new));
227 } else {
228 logger.trace("Fetched properties");
229
230
231
232
233
234 final Map<String, Set<Property>> propertiesPerMp = $(result).bind(toProperty).foldl(
235 new HashMap<String, Set<Property>>(),
236 new Fn2<Map<String, Set<Property>>, Property, Map<String, Set<Property>>>() {
237 @Override
238 public Map<String, Set<Property>> apply(Map<String, Set<Property>> sum, Property p) {
239 final String mpId = p.getId().getMediaPackageId();
240 final Set<Property> props = sum.get(mpId);
241 if (props != null) {
242 props.add(p);
243 } else {
244 sum.put(mpId, SetB.MH.mk(p));
245 }
246 return sum;
247 }
248 });
249
250 final Map<String, List<ARecordImpl>> distinctRecords = result.stream()
251 .map(tuple -> toARecord(tuple, r))
252 .collect(Collectors.groupingBy(record -> record.getMediaPackageId()));
253 records = distinctRecords.values().stream()
254 .flatMap(List::stream)
255 .map(record -> {
256 final Set<Property> properties = propertiesPerMp.get(record.getMediaPackageId());
257 final List<Property> p = properties != null
258 ? properties.stream().collect(Collectors.toList()) : new ArrayList<>();
259 Snapshot snapshot = null;
260 Optional<Snapshot> snapshotOpt = record.getSnapshot();
261 if (snapshotOpt.isPresent()) {
262
263 snapshot = am.getHttpAssetProvider().prepareForDelivery(snapshotOpt.get());
264 }
265 return new ARecordImpl(record.getSnapshotId(), record.getMediaPackageId(), p, snapshot);
266 })
267 .collect(Collectors.toCollection(LinkedHashSet::new));
268 }
269 }
270 final long searchTime = (System.nanoTime() - startTime) / 1000000;
271 logger.debug("Complete query ms " + searchTime);
272 LinkedHashSet<ARecord> narrowRecords = new LinkedHashSet<>();
273 for (ARecordImpl recordImpl : records) {
274 narrowRecords.add(recordImpl);
275 }
276 return new AResultImpl(
277 narrowRecords,
278 narrowRecords.size(),
279 r.offset.getOr(0),
280 r.limit.getOr(-1),
281 searchTime
282 );
283 }
284
285
286
287
288
289 private Fn<Tuple, ARecordImpl> toARecord(final SelectQueryContribution c) {
290 return new Fn<Tuple, ARecordImpl>() {
291 @Override public ARecordImpl apply(Tuple tuple) {
292 final String mediaPackageId;
293 SnapshotDto snapshotDto = null;
294 final long id;
295
296 if (c.fetch.exists(Booleans.<Expression<?>>eq(Q_SNAPSHOT))) {
297 snapshotDto = RequireUtil.notNull(tuple.get(Q_SNAPSHOT), "[BUG] snapshot table data");
298 id = snapshotDto.getId();
299 mediaPackageId = snapshotDto.getMediaPackageId();
300 } else {
301
302 id = RequireUtil.notNull(tuple.get(Q_SNAPSHOT.id), "[BUG] snapshot table id");
303 mediaPackageId = RequireUtil.notNull(
304 tuple.get(Q_SNAPSHOT.mediaPackageId),
305 "[BUG] snapshot table media package id"
306 );
307 }
308 return new ARecordImpl(id, mediaPackageId, new ArrayList<>(), snapshotDto);
309 }
310 };
311 }
312
313 private ARecordImpl toARecord(Tuple tuple, final SelectQueryContribution c) {
314 final String mediaPackageId;
315 SnapshotDto snapshotDto = null;
316 final long id;
317
318 if (c.fetch.exists(Booleans.<Expression<?>>eq(Q_SNAPSHOT))) {
319 snapshotDto = RequireUtil.notNull(tuple.get(Q_SNAPSHOT), "[BUG] snapshot table data");
320 id = snapshotDto.getId();
321 mediaPackageId = snapshotDto.getMediaPackageId();
322 } else {
323
324 id = RequireUtil.notNull(tuple.get(Q_SNAPSHOT.id), "[BUG] snapshot table id");
325 mediaPackageId = RequireUtil.notNull(
326 tuple.get(Q_SNAPSHOT.mediaPackageId),
327 "[BUG] snapshot table media package id"
328 );
329 }
330 return new ARecordImpl(id, mediaPackageId, new ArrayList<>(), snapshotDto);
331 }
332
333 private static Fn<Tuple, Opt<Property>> toProperty = new Fn<Tuple, Opt<Property>>() {
334 @Override public Opt<Property> apply(Tuple tuple) {
335 final PropertyDto dto = tuple.get(Q_PROPERTY);
336 return dto != null ? Opt.some(dto.toProperty()) : Opt.<Property>none();
337 }
338 };
339
340
341
342
343 private static final class MandatoryFetch {
344 static final Fn<Expression<?>, Boolean> exists =
345 Booleans.<Expression<?>>eq(Q_SNAPSHOT)
346 .or(Booleans.<Expression<?>>eq(Q_SNAPSHOT.mediaPackageId))
347 .or(Booleans.<Expression<?>>eq(Q_SNAPSHOT.id));
348
349 static final Stream<Expression<?>> fetch = Stream.<Expression<?>>mk(Q_SNAPSHOT.mediaPackageId, Q_SNAPSHOT.id);
350 }
351
352 private static <A> Stream<A> vary(Stream<? extends A> a) {
353 return (Stream<A>) a;
354 }
355
356 private static <A> int sizeOf(Stream<A> stream) {
357 int count = 0;
358 for (A ignore : stream) {
359 count++;
360 }
361 return count;
362 }
363 }