1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.opencastproject.elasticsearch.index.rebuild;
23
24 import static java.lang.String.format;
25
26 import org.opencastproject.elasticsearch.index.ElasticsearchIndex;
27
28 import org.osgi.framework.BundleActivator;
29 import org.osgi.framework.BundleContext;
30 import org.osgi.framework.ServiceEvent;
31 import org.osgi.framework.ServiceListener;
32 import org.osgi.framework.ServiceReference;
33 import org.osgi.framework.ServiceRegistration;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.io.IOException;
38 import java.util.HashMap;
39 import java.util.Map;
40 import java.util.concurrent.ConcurrentHashMap;
41
42
43
44
45 public class IndexRebuildService implements BundleActivator {
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public enum Service {
68 Themes, Series, Scheduler, AssetManager, Comments, Workflow, Search
69 }
70
71 public enum DataType {
72 ALL, ACL
73 }
74
75 public enum State {
76 PENDING, RUNNING, OK, ERROR
77 }
78
79 private static final Logger logger = LoggerFactory.getLogger(IndexRebuildService.class);
80 private final Map<IndexRebuildService.Service, IndexProducer> indexProducers = new ConcurrentHashMap<>();
81 private ServiceRegistration<?> serviceRegistration = null;
82
83
84
85
86
87
88
89
90
91 @Override
92 public void start(BundleContext bundleContext) throws Exception {
93
94 ServiceReference<?>[] serviceReferences = bundleContext.getAllServiceReferences(IndexProducer.class.getName(),
95 null);
96 if (serviceReferences != null) {
97 for (ServiceReference<?> serviceReference : serviceReferences) {
98 addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
99 }
100 }
101
102
103 setAllRebuildStates(IndexRebuildService.State.OK);
104
105
106 bundleContext.addServiceListener(new IndexProducerListener(bundleContext),
107 "(objectClass=" + IndexProducer.class.getName() + ")");
108 }
109
110
111
112
113
114
115
116
117
118 @Override
119 public void stop(BundleContext bundleContext) throws Exception {
120
121 unregisterIndexRebuildService();
122 }
123
124
125
126
127
128
129
130
131
132
133
134 public synchronized IndexProducer getIndexProducer(Service service) throws IllegalStateException {
135 if (!indexProducers.containsKey(service)) {
136 throw new IllegalStateException(format("Service %s is not available", service));
137 }
138 return indexProducers.get(service);
139 }
140
141
142
143
144
145
146
147
148
149
150
151
152 public synchronized void rebuildIndex(ElasticsearchIndex index) throws IOException, IndexRebuildException,
153 IllegalArgumentException {
154 index.clear();
155 logger.info("Index cleared, starting complete rebuild.");
156 setAllRebuildStates(IndexRebuildService.State.PENDING);
157 for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
158 rebuildIndexInternal(getIndexProducer(service), DataType.ALL);
159 }
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174 public synchronized void rebuildIndex(IndexProducer indexProducer, DataType dataType)
175 throws IllegalArgumentException, IndexRebuildException {
176 logger.info("Starting partial rebuild of the {} index.", indexProducer.getService());
177 setRebuildState(indexProducer.getService(), IndexRebuildService.State.PENDING);
178 rebuildIndexInternal(indexProducer, dataType);
179 }
180
181
182
183
184
185
186
187
188
189
190
191
192
193 public synchronized void resumeIndexRebuild(Service startingService)
194 throws IllegalArgumentException, IndexRebuildException {
195 logger.info("Resuming rebuild of {} index.", startingService);
196 setSubsetOfRebuildStates(startingService, IndexRebuildService.State.PENDING);
197 Service[] services = IndexRebuildService.Service.values();
198 for (int i = startingService.ordinal(); i < services.length; i++) {
199 rebuildIndexInternal(getIndexProducer(services[i]), DataType.ALL);
200 }
201 }
202
203 private void rebuildIndexInternal(IndexProducer indexProducer, DataType dataType) throws IndexRebuildException,
204 IllegalArgumentException {
205 if (!indexProducer.dataTypeSupported(dataType)) {
206 throw new IllegalArgumentException("Service " + indexProducer.getService() + "doesn't support data type "
207 + dataType + " for index rebuild.");
208 }
209 Service service = indexProducer.getService();
210 logger.info("Starting to rebuild the {} index", service);
211 setRebuildState(service, IndexRebuildService.State.RUNNING);
212 try {
213 indexProducer.repopulate(dataType);
214 setRebuildState(service, IndexRebuildService.State.OK);
215 } catch (IndexRebuildException e) {
216 setRebuildState(service, IndexRebuildService.State.ERROR);
217 throw e;
218 }
219 logger.info("Finished rebuilding the {} index", service);
220 }
221
222
223
224
225
226
227
228
229
230 private void addIndexProducer(IndexProducer indexProducer, BundleContext bundleContext) {
231
232 if (indexProducers.putIfAbsent(indexProducer.getService(), indexProducer) == null) {
233 logger.info("Service {} added.", indexProducer.getService());
234
235
236 if (indexProducers.size() == IndexRebuildService.Service.values().length) {
237 registerIndexRebuildService(bundleContext);
238 }
239 }
240 }
241
242
243
244
245
246
247
248 private void removeIndexProducer(IndexProducer indexProducer) {
249
250 if (indexProducers.remove(indexProducer.getService(), indexProducer)) {
251 logger.info("Service {} removed.", indexProducer.getService());
252
253
254 if (indexProducers.size() != IndexRebuildService.Service.values().length) {
255 unregisterIndexRebuildService();
256 }
257 }
258 }
259
260
261
262
263 private synchronized void unregisterIndexRebuildService() {
264
265 if (serviceRegistration != null) {
266 logger.info("Unregister IndexRebuildService.");
267 serviceRegistration.unregister();
268 serviceRegistration = null;
269 }
270 }
271
272
273
274
275
276
277
278 private synchronized void registerIndexRebuildService(BundleContext bundleContext) {
279
280 if (serviceRegistration == null) {
281 logger.info("Register IndexRebuildService.");
282 serviceRegistration = bundleContext.registerService(this.getClass().getName(), IndexRebuildService.this, null);
283 }
284 }
285
286
287
288
289 private final class IndexProducerListener implements ServiceListener {
290
291 private final BundleContext bundleContext;
292
293
294
295
296
297
298
299 private IndexProducerListener(BundleContext bundleContext) {
300 this.bundleContext = bundleContext;
301 }
302
303 @Override
304 public void serviceChanged(ServiceEvent serviceEvent) {
305
306 if (serviceEvent.getType() == ServiceEvent.REGISTERED) {
307 ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
308 addIndexProducer((IndexProducer) bundleContext.getService(serviceReference), bundleContext);
309
310
311 } else if (serviceEvent.getType() == ServiceEvent.UNREGISTERING) {
312 ServiceReference<?> serviceReference = serviceEvent.getServiceReference();
313 removeIndexProducer((IndexProducer) bundleContext.getService(serviceReference));
314 }
315 }
316 }
317
318 private final Map<Service, State> rebuildStates = new HashMap<>();
319
320
321
322
323 public Map<String, String> getRebuildStates() {
324 Map <String, String> statesAsString = new HashMap<>();
325 for (Map.Entry<IndexRebuildService.Service,IndexRebuildService.State> entry : rebuildStates.entrySet()) {
326 statesAsString.put(entry.getKey().toString(), entry.getValue().toString());
327 }
328 return statesAsString;
329 }
330
331
332
333
334
335
336
337 private void setAllRebuildStates(IndexRebuildService.State state) {
338 for (IndexRebuildService.Service service: IndexRebuildService.Service.values()) {
339 setRebuildState(service, state);
340 }
341 }
342
343
344
345
346
347
348
349
350
351 private void setSubsetOfRebuildStates(IndexRebuildService.Service startingService, IndexRebuildService.State state) {
352 Service[] services = IndexRebuildService.Service.values();
353 for (int i = startingService.ordinal(); i < services.length; i++) {
354 rebuildStates.put(services[i], state);
355 }
356 }
357
358
359
360
361
362
363
364
365
366 private void setRebuildState(IndexRebuildService.Service service, IndexRebuildService.State state) {
367 rebuildStates.put(service, state);
368 }
369 }