Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ protected void runOneIteration() throws Exception {
}

for ( JobDescriptor jd : activeJobs ) {
logger.info( "Submitting work for {}", jd );
logger.debug( "Submitting work for {}", jd );
submitWork( jd );
logger.info( "Work submitted for {}", jd );
logger.debug( "Work submitted for {}", jd );
}
}
}
Expand Down Expand Up @@ -228,7 +228,7 @@ public Void call() throws Exception {
// TODO wrap and throw specifically typed exception for onFailure,
// needs jobId

logger.info( "Starting job {} with execution data {}", job, execution );
logger.debug( "Starting job {} with execution data {}", job, execution );

job.execute( execution );

Expand Down Expand Up @@ -259,7 +259,7 @@ public void onSuccess( Void param ) {

//TODO, refactor into the execution itself for checking if done
if ( execution.getStatus() == Status.IN_PROGRESS ) {
logger.info( "Successful completion of bulkJob {}", execution );
logger.debug( "Successful completion of bulkJob {}", execution );
execution.completed();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ else if ( jobStatus == Status.DEAD ) {
getEm().update( data );
}

logger.info( "Updating stats for job {}", data.getJobName() );
logger.debug( "Updating stats for job {}", data.getJobName() );

getEm().update( stat );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
import org.apache.usergrid.corepersistence.asyncevents.*;
import org.apache.usergrid.corepersistence.export.ExportService;
import org.apache.usergrid.corepersistence.export.ExportServiceImpl;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.migration.CoreMigration;
import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void configureMigrationProvider() {

bind( ReIndexService.class ).to( ReIndexServiceImpl.class );

bind( ExportService.class ).to( ExportServiceImpl.class );

install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
.build( AggregationServiceFactory.class ) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3089,52 +3089,16 @@ public void addIndex(final String newIndexName,final int shards,final int replic
managerCache.getEntityIndex(applicationScope).addIndex(newIndexName, shards, replicas, writeConsistency);
}


@Override
public void initializeIndex(){
managerCache.getEntityIndex(applicationScope).initialize();
}
/**
* TODO, these 3 methods are super janky. During refactoring we should clean this model up
*/
public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
try {
long start = System.currentTimeMillis();
// refresh special indexes without calling EntityManager refresh because stack overflow
Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
map.put("some prop", "test");
boolean hasFinished = false;
Entity refreshEntity = create("refresh", map);
EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo
= managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();

try {
for (int i = 0; i < 20; i++) {
if (searchCollection(
new SimpleEntityRef(
org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
InflectionUtils.pluralize("refresh"),
Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
).size() > 0
) {
hasFinished = true;
break;
}
int sleepTime = 500;
logger.info("Sleeping {} ms during refreshIndex", sleepTime);
Thread.sleep(sleepTime);

indexRefreshCommandInfo
= managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
}
if(!hasFinished){
throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid());
}
}finally {
delete(refreshEntity);
}
Thread.sleep(100);

return indexRefreshCommandInfo;
public EntityIndex.IndexRefreshCommandInfo refreshIndex() {
try {
return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
} catch (Exception e) {
throw new RuntimeException("refresh failed",e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ public Results searchCollection( String collectionName, Query query ) throws Exc
final Query toExecute = adjustQuery( query );
final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
final Id ownerId = headEntity.asId();
final boolean analyzeOnly = query.getAnalyzeOnly();


if(query.getLevel() == Level.IDS ){
Expand All @@ -642,6 +643,8 @@ protected Observable<ResultsPage<Id>> buildNewResultsPage(
new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
queryString, cursor );

search.setAnalyzeOnly(analyzeOnly);

return collectionService.searchCollectionIds( search );
}
}.next();
Expand All @@ -658,6 +661,8 @@ protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.En
new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
queryString, cursor );

search.setAnalyzeOnly(analyzeOnly);

return collectionService.searchCollection( search );
}
}.next();
Expand Down Expand Up @@ -919,6 +924,8 @@ public Results searchTargetEntities( Query query ) throws Exception {

headEntity = em.validate( headEntity );

final boolean analyzeOnly = query.getAnalyzeOnly();


final Query toExecute = adjustQuery( query );

Expand Down Expand Up @@ -951,6 +958,7 @@ protected Observable<ResultsPage<ConnectionRef>> buildNewResultsPage( final Opti
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
queryString, cursor, isConnecting );
search.setAnalyzeOnly(analyzeOnly);
return connectionService.searchConnectionAsRefs( search );
}
}.next();
Expand All @@ -966,6 +974,7 @@ protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.En
final ConnectionSearch search =
new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
queryString, cursor, isConnecting );
search.setAnalyzeOnly(analyzeOnly);
return connectionService.searchConnection( search );
}
}.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ else if (event instanceof ElasticsearchIndexEvent) {
if( !(event instanceof ElasticsearchIndexEvent)
&& !(event instanceof InitializeApplicationIndexEvent)
&& single.isEmpty() ){
logger.warn("No index operation messages came back from event processing for msg: {} ",
message.getStringBody().trim());
logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}",
event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody());
}


Expand Down Expand Up @@ -481,15 +481,7 @@ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
entity.getId().getUuid(), entity.getId().getType());

offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(applicationScope, entity.getId()), 0));

final EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);

final IndexOperationMessage indexMessage =
eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);

queueIndexOperationMessage( indexMessage, false);
new EntityIdScope(applicationScope, entity.getId()), updatedAfter));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.inject.Provider;
import com.google.inject.Singleton;

import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.DISTRIBUTED;
import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL;


Expand Down Expand Up @@ -121,6 +122,10 @@ private AsyncEventService getIndexService() {
asyncEventService.MAX_TAKE = 1000;
}

if ( impl.equals( DISTRIBUTED )) {
asyncEventService.MAX_TAKE = 500;
}

return asyncEventService;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.export;


import com.google.common.base.Optional;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;

import java.util.UUID;
import java.util.concurrent.TimeUnit;


/**
* A builder interface to build our re-index request
*/
public interface ExportRequestBuilder {

/**
* Set the application id
*/
ExportRequestBuilder withApplicationId(final UUID applicationId);


/**
* Get the application scope
* @return
*/
Optional<ApplicationScope> getApplicationScope();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.export;


import com.google.common.base.Optional;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;

import java.util.UUID;
import java.util.concurrent.TimeUnit;


/**
* Index service request builder
*/
public class ExportRequestBuilderImpl implements ExportRequestBuilder {

private Optional<UUID> withApplicationId = Optional.absent();
private Optional<String> withCollectionName = Optional.absent();
private Optional<String> cursor = Optional.absent();
private Optional<Long> updateTimestamp = Optional.absent();
private Optional<Integer> delayTimer = Optional.absent();
private Optional<TimeUnit> timeUnitOptional = Optional.absent();


/***
*
* @param applicationId The application id
* @return
*/
@Override
public ExportRequestBuilder withApplicationId(final UUID applicationId ) {
this.withApplicationId = Optional.fromNullable( applicationId );
return this;
}

@Override
public Optional<ApplicationScope> getApplicationScope() {

if ( this.withApplicationId.isPresent() ) {
return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
}

return Optional.absent();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.export;


import java.io.IOException;
import java.io.OutputStream;

/**
* An interface for exporting all entities within an application
*/
public interface ExportService {


/**
* Perform an application export into the provided OutputStream
*
* @param exportRequestBuilder The builder to build the request
*/
void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws RuntimeException;


/**
* Generate a builder for the export request
*/
ExportRequestBuilder getBuilder();


enum Status{
STARTED, INPROGRESS, COMPLETE, UNKNOWN;
}
}
Loading