diff --git a/.github/workflows/mqttClientTest.yml b/.github/workflows/mqttClientTest.yml new file mode 100644 index 0000000000..946ae5fd57 --- /dev/null +++ b/.github/workflows/mqttClientTest.yml @@ -0,0 +1,51 @@ +name: Polypheny-DB MQTT Client Test + +on: + push: { branches: mqtt-interface } + +jobs: + build: + runs-on: ubuntu-latest + name: MQTT Client Tests (Java 17) + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Set up JDK + uses: actions/setup-java@v1 + with: + java-version: 17 + - name: Set env variable + run: | + echo "POLYPHENY_HOME=$GITHUB_WORKSPACE" >> $GITHUB_ENV + - name: Create folders for certs + run: | + mkdir $POLYPHENY_HOME/.polypheny + mkdir $POLYPHENY_HOME/.polypheny/certs + + - name: Start Mosquitto broker in docker container + uses: namoshek/mosquitto-github-action@v1 + with: + version: '1.6' + ports: '1883:1883' + container-name: 'mqttBroker' + + - name: Assemble + uses: nick-invision/retry@v2 + with: + max_attempts: 2 + timeout_minutes: 60 + command: ./gradlew assemble + + - name: Build Plugins + uses: nick-invision/retry@v2 + with: + max_attempts: 1 + timeout_minutes: 60 + command: ./gradlew assemblePlugins + + - name: Execute MQTT client tests + uses: nick-invision/retry@v2 + with: + max_attempts: 1 + timeout_minutes: 30 + command: ./gradlew mqttTests \ No newline at end of file diff --git a/core/src/main/java/org/polypheny/db/adapter/Adapter.java b/core/src/main/java/org/polypheny/db/adapter/Adapter.java index d5b97e93fb..f562859d80 100644 --- a/core/src/main/java/org/polypheny/db/adapter/Adapter.java +++ b/core/src/main/java/org/polypheny/db/adapter/Adapter.java @@ -53,6 +53,7 @@ import org.polypheny.db.catalog.entity.CatalogColumnPlacement; import org.polypheny.db.catalog.entity.CatalogPartitionPlacement; import org.polypheny.db.catalog.entity.CatalogTable; +import org.polypheny.db.catalog.exceptions.NoTablePrimaryKeyException; import org.polypheny.db.config.Config; import org.polypheny.db.config.Config.ConfigListener; import org.polypheny.db.config.ConfigDocker; @@ -406,7 +407,13 @@ public void updateSettings( Map newSettings ) { this.validateSettings( newSettings, false ); List updatedSettings = this.applySettings( newSettings ); this.reloadSettings( updatedSettings ); - Catalog.getInstance().updateAdapterSettings( getAdapterId(), newSettings ); + Catalog catalog = Catalog.getInstance(); + catalog.updateAdapterSettings( getAdapterId(), getCurrentSettings() ); + try { + catalog.commit(); + } catch ( NoTablePrimaryKeyException e ) { + throw new RuntimeException( e ); + } } diff --git a/core/src/main/java/org/polypheny/db/adapter/enumerable/EnumerableDocumentValues.java b/core/src/main/java/org/polypheny/db/adapter/enumerable/EnumerableDocumentValues.java new file mode 100644 index 0000000000..7bfd0e2a14 --- /dev/null +++ b/core/src/main/java/org/polypheny/db/adapter/enumerable/EnumerableDocumentValues.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.adapter.enumerable; + +import com.google.common.collect.ImmutableList; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.bson.BsonValue; +import org.polypheny.db.adapter.java.JavaTypeFactory; +import org.polypheny.db.algebra.core.document.DocumentValues; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.algebra.type.AlgDataTypeField; +import org.polypheny.db.plan.AlgOptCluster; +import org.polypheny.db.plan.AlgTraitSet; +import org.polypheny.db.schema.ModelTrait; +import org.polypheny.db.util.BuiltInMethod; + +public class EnumerableDocumentValues extends DocumentValues implements EnumerableAlg { + + /** + * Creates a {@link DocumentValues}. + * {@link ModelTrait#DOCUMENT} node, which contains values. + * + * @param cluster + * @param traitSet + * @param rowType + * @param documentTuples + */ + public EnumerableDocumentValues( AlgOptCluster cluster, AlgTraitSet traitSet, AlgDataType rowType, List documentTuples ) { + super( cluster, traitSet, rowType, ImmutableList.copyOf( documentTuples ) ); + } + + + @Override + public Result implement( EnumerableAlgImplementor implementor, Prefer pref ) { + final JavaTypeFactory typeFactory = (JavaTypeFactory) getCluster().getTypeFactory(); + final BlockBuilder builder = new BlockBuilder(); + final PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + getRowType(), + pref.preferCustom() ); + final Type rowClass = physType.getJavaRowType(); + + final List expressions = new ArrayList<>(); + final List fields = rowType.getFieldList(); + for ( BsonValue doc : documentTuples ) { + final List literals = new ArrayList<>(); + + literals.add( Expressions.constant( doc.asDocument().toJson() ) ); + + expressions.add( physType.record( literals ) ); + } + builder.add( + Expressions.return_( + null, + Expressions.call( + BuiltInMethod.AS_ENUMERABLE.method, + Expressions.newArrayInit( Primitive.box( rowClass ), expressions ) ) ) ); + return implementor.result( physType, builder.toBlock() ); + } + +} diff --git a/core/src/main/java/org/polypheny/db/algebra/logical/document/LogicalDocumentValues.java b/core/src/main/java/org/polypheny/db/algebra/logical/document/LogicalDocumentValues.java index bb5bf83111..083503b78e 100644 --- a/core/src/main/java/org/polypheny/db/algebra/logical/document/LogicalDocumentValues.java +++ b/core/src/main/java/org/polypheny/db/algebra/logical/document/LogicalDocumentValues.java @@ -38,6 +38,7 @@ import org.polypheny.db.plan.AlgTraitSet; import org.polypheny.db.plan.Convention; import org.polypheny.db.rex.RexLiteral; +import org.polypheny.db.schema.ModelTrait; import org.polypheny.db.type.PolyType; import org.polypheny.db.type.PolyTypeFactoryImpl; import org.polypheny.db.util.BsonUtil; @@ -77,7 +78,7 @@ public class LogicalDocumentValues extends DocumentValues implements RelationalT * @param tuples the documents in their native BSON format */ public LogicalDocumentValues( AlgOptCluster cluster, AlgDataType defaultRowType, AlgTraitSet traitSet, ImmutableList tuples ) { - super( cluster, traitSet, defaultRowType, tuples ); + super( cluster, traitSet.replace( ModelTrait.DOCUMENT ), defaultRowType, tuples ); } diff --git a/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java b/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java index 0530c25fff..d1570b9615 100644 --- a/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java +++ b/core/src/main/java/org/polypheny/db/algebra/rules/DocumentToEnumerableRule.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.polypheny.db.adapter.enumerable.EnumerableAggregate; import org.polypheny.db.adapter.enumerable.EnumerableConvention; +import org.polypheny.db.adapter.enumerable.EnumerableDocumentValues; import org.polypheny.db.adapter.enumerable.EnumerableFilter; import org.polypheny.db.adapter.enumerable.EnumerableLimit; import org.polypheny.db.adapter.enumerable.EnumerableProject; @@ -30,6 +31,7 @@ import org.polypheny.db.algebra.logical.document.LogicalDocumentFilter; import org.polypheny.db.algebra.logical.document.LogicalDocumentProject; import org.polypheny.db.algebra.logical.document.LogicalDocumentSort; +import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; import org.polypheny.db.plan.AlgOptRule; import org.polypheny.db.plan.AlgOptRuleCall; import org.polypheny.db.plan.AlgOptRuleOperand; @@ -43,6 +45,8 @@ public class DocumentToEnumerableRule extends AlgOptRule { public static DocumentToEnumerableRule FILTER_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.FILTER, operand( LogicalDocumentFilter.class, any() ), "DOCUMENT_FILTER_TO_ENUMERABLE" ); public static DocumentToEnumerableRule SORT_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.SORT, operand( LogicalDocumentSort.class, any() ), "DOCUMENT_SORT_TO_ENUMERABLE" ); + public static DocumentToEnumerableRule VALUES_TO_ENUMERABLE = new DocumentToEnumerableRule( Type.VALUES, operand( LogicalDocumentValues.class, any() ), "DOCUMENT_VALUES_TO_ENUMERABLE" ); + private final Type type; @@ -62,6 +66,8 @@ public void onMatch( AlgOptRuleCall call ) { convertAggregate( call ); } else if ( type == Type.SORT ) { convertSort( call ); + } else if ( type == Type.VALUES ) { + convertValues( call ); } else { throw new UnsupportedOperationException( "This document is not supported." ); } @@ -69,6 +75,17 @@ public void onMatch( AlgOptRuleCall call ) { } + private void convertValues( AlgOptRuleCall call ) { + LogicalDocumentValues values = call.alg( 0 ); + AlgTraitSet out = values.getTraitSet().replace( EnumerableConvention.INSTANCE ); + + EnumerableDocumentValues enumerable = new EnumerableDocumentValues( values.getCluster(), out, values.getRowType(), values.documentTuples ); + call.transformTo( enumerable ); + + // call.transformTo( values.getRelationalEquivalent() ); + } + + private void convertSort( AlgOptRuleCall call ) { LogicalDocumentSort sort = call.alg( 0 ); AlgTraitSet out = sort.getTraitSet().replace( EnumerableConvention.INSTANCE ); diff --git a/core/src/main/java/org/polypheny/db/catalog/Catalog.java b/core/src/main/java/org/polypheny/db/catalog/Catalog.java index 644086c921..505d11fcf4 100644 --- a/core/src/main/java/org/polypheny/db/catalog/Catalog.java +++ b/core/src/main/java/org/polypheny/db/catalog/Catalog.java @@ -1233,6 +1233,14 @@ protected final boolean isValidIdentifier( final String str ) { */ public abstract void deleteQueryInterface( int ifaceId ); + /** + * Update settings of a query interface + * + * @param queryInterfaceId The id of the query interface + * @param newSettings The new settings for the query interface + */ + public abstract void updateQueryInterfaceSettings( int queryInterfaceId, Map newSettings ); + /** * Adds a partition to the catalog * @@ -1926,7 +1934,7 @@ public enum EntityType { SOURCE( 2 ), VIEW( 3 ), MATERIALIZED_VIEW( 4 ); - // STREAM, ... + //STREAM(5); private final int id; diff --git a/core/src/main/java/org/polypheny/db/catalog/entity/CatalogCollection.java b/core/src/main/java/org/polypheny/db/catalog/entity/CatalogCollection.java index 19b65f4d47..18b1690c3f 100644 --- a/core/src/main/java/org/polypheny/db/catalog/entity/CatalogCollection.java +++ b/core/src/main/java/org/polypheny/db/catalog/entity/CatalogCollection.java @@ -61,13 +61,13 @@ public Serializable[] getParameterArray() { public CatalogCollection addPlacement( int adapterId ) { List placements = new ArrayList<>( this.placements ); placements.add( adapterId ); - return new CatalogCollection( databaseId, namespaceId, id, name, placements, EntityType.ENTITY, physicalName ); + return new CatalogCollection( databaseId, namespaceId, id, name, placements, entityType, physicalName ); } public CatalogCollection removePlacement( int adapterId ) { List placements = this.placements.stream().filter( id -> id != adapterId ).collect( Collectors.toList() ); - return new CatalogCollection( databaseId, namespaceId, id, name, placements, EntityType.ENTITY, physicalName ); + return new CatalogCollection( databaseId, namespaceId, id, name, placements, entityType, physicalName ); } diff --git a/core/src/main/java/org/polypheny/db/config/RuntimeConfig.java b/core/src/main/java/org/polypheny/db/config/RuntimeConfig.java index bd85e9f135..9b8d03955f 100644 --- a/core/src/main/java/org/polypheny/db/config/RuntimeConfig.java +++ b/core/src/main/java/org/polypheny/db/config/RuntimeConfig.java @@ -159,21 +159,21 @@ public enum RuntimeConfig { DYNAMIC_QUERYING( "statistics/useDynamicQuerying", "Use statistics for query assistance.", - true, + false, ConfigType.BOOLEAN, "statisticSettingsGroup" ), STATISTICS_ON_STARTUP( "statistics/statisticsOnStartup", "Whether to build statistics for all stored data on system startup.", - true, + false, ConfigType.BOOLEAN, "statisticSettingsGroup" ), ACTIVE_TRACKING( "statistics/activeTracking", "All transactions are tracked and statistics collected during execution.", - true, + false, ConfigType.BOOLEAN, "statisticSettingsGroup" ), @@ -418,7 +418,7 @@ public enum RuntimeConfig { MONITORING_QUEUE_ACTIVE( "runtime/monitoringQueueActive", "Enables automatic monitoring of executed events in workload monitoring. If disabled no events are captured, hence the queue remains empty. This also effects routing!", - true, + false, ConfigType.BOOLEAN, "monitoringSettingsQueueGroup" ), diff --git a/core/src/main/java/org/polypheny/db/iface/QueryInterface.java b/core/src/main/java/org/polypheny/db/iface/QueryInterface.java index afa8a50e5c..c0e5ee1c5d 100644 --- a/core/src/main/java/org/polypheny/db/iface/QueryInterface.java +++ b/core/src/main/java/org/polypheny/db/iface/QueryInterface.java @@ -20,6 +20,7 @@ import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -27,6 +28,8 @@ import lombok.AllArgsConstructor; import lombok.Getter; import org.pf4j.ExtensionPoint; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.exceptions.NoTablePrimaryKeyException; import org.polypheny.db.languages.LanguageManager; import org.polypheny.db.transaction.TransactionManager; @@ -61,10 +64,15 @@ public QueryInterface( this.authenticator = authenticator; this.queryInterfaceId = queryInterfaceId; this.uniqueName = uniqueName; - this.settings = settings; + //this.settings = settings; this.supportsDml = supportsDml; this.supportsDdl = supportsDdl; + this.settings = new HashMap<>(settings.size()); + for ( Map.Entry entry : settings.entrySet()) { + this.settings.put(entry.getKey(), entry.getValue()); + } + LanguageManager.getINSTANCE().addObserver( this ); } @@ -120,6 +128,13 @@ public void updateSettings( Map newSettings ) { this.validateSettings( newSettings, false ); List updatedSettings = this.applySettings( newSettings ); this.reloadSettings( updatedSettings ); + Catalog catalog = Catalog.getInstance(); + Catalog.getInstance().updateQueryInterfaceSettings( getQueryInterfaceId(), getCurrentSettings() ); + try { + catalog.commit(); + } catch ( NoTablePrimaryKeyException e ) { + throw new RuntimeException( e ); + } } diff --git a/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java b/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java index e6abfe2f19..f831335452 100644 --- a/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java +++ b/core/src/main/java/org/polypheny/db/plan/volcano/VolcanoPlanner.java @@ -830,6 +830,7 @@ public void registerModelRules() { addRule( DocumentToEnumerableRule.FILTER_TO_ENUMERABLE ); addRule( DocumentToEnumerableRule.AGGREGATE_TO_ENUMERABLE ); addRule( DocumentToEnumerableRule.SORT_TO_ENUMERABLE ); + addRule( DocumentToEnumerableRule.VALUES_TO_ENUMERABLE ); // Relational } diff --git a/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java b/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java index 1ba3b3de60..1e75b09b79 100644 --- a/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java +++ b/core/src/main/java/org/polypheny/db/processing/LogicalAlgAnalyzeShuttle.java @@ -38,6 +38,7 @@ import org.polypheny.db.algebra.logical.document.LogicalDocumentScan; import org.polypheny.db.algebra.logical.document.LogicalDocumentSort; import org.polypheny.db.algebra.logical.document.LogicalDocumentTransformer; +import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; import org.polypheny.db.algebra.logical.lpg.LogicalGraph; import org.polypheny.db.algebra.logical.lpg.LogicalLpgAggregate; import org.polypheny.db.algebra.logical.lpg.LogicalLpgFilter; @@ -261,6 +262,11 @@ public AlgNode visit( LogicalDocumentScan scan ) { return super.visit( scan ); } + public AlgNode visit( LogicalDocumentValues values ) { + hashBasis.add( "LogicalDocumentValues" ); + return super.visit( values ); + } + @Override public AlgNode visit( LogicalDocumentSort sort ) { diff --git a/core/src/main/java/org/polypheny/db/runtime/functions/MqlFunctions.java b/core/src/main/java/org/polypheny/db/runtime/functions/MqlFunctions.java index ab7468ff15..d3e3cc5672 100644 --- a/core/src/main/java/org/polypheny/db/runtime/functions/MqlFunctions.java +++ b/core/src/main/java/org/polypheny/db/runtime/functions/MqlFunctions.java @@ -781,6 +781,8 @@ private static Object transformBsonToPrimitive( BsonValue doc ) { return doc.asDocument().entrySet().stream().collect( Collectors.toMap( Entry::getKey, e -> transformBsonToPrimitive( e.getValue() ) ) ); case ARRAY: return doc.asArray().stream().map( MqlFunctions::transformBsonToPrimitive ).collect( Collectors.toList() ); + case BOOLEAN: + return doc.asBoolean().getValue(); default: return null; } diff --git a/core/src/main/java/org/polypheny/db/stream/StreamMessage.java b/core/src/main/java/org/polypheny/db/stream/StreamMessage.java new file mode 100644 index 0000000000..2a39f09361 --- /dev/null +++ b/core/src/main/java/org/polypheny/db/stream/StreamMessage.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.stream; + +public interface StreamMessage { + + + T getData(); +} diff --git a/core/src/main/java/org/polypheny/db/stream/StreamProcessor.java b/core/src/main/java/org/polypheny/db/stream/StreamProcessor.java new file mode 100644 index 0000000000..5a3f3511e2 --- /dev/null +++ b/core/src/main/java/org/polypheny/db/stream/StreamProcessor.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.stream; + +/** + * + */ +public interface StreamProcessor { + + String getStream( ); + + boolean isNumber ( String value ); + + boolean isBoolean( String value); + + + +} diff --git a/core/src/main/java/org/polypheny/db/tools/AlgBuilder.java b/core/src/main/java/org/polypheny/db/tools/AlgBuilder.java index 87a5bf4ec8..94b81a83b2 100644 --- a/core/src/main/java/org/polypheny/db/tools/AlgBuilder.java +++ b/core/src/main/java/org/polypheny/db/tools/AlgBuilder.java @@ -34,20 +34,67 @@ package org.polypheny.db.tools; +import static org.polypheny.db.util.Static.RESOURCE; + import com.google.common.base.Preconditions; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.math.BigDecimal; +import java.util.AbstractList; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; import lombok.Getter; import org.apache.calcite.linq4j.Ord; import org.apache.calcite.linq4j.function.Experimental; +import org.bson.BsonDocument; +import org.bson.BsonElement; import org.bson.BsonValue; -import org.polypheny.db.algebra.*; +import org.polypheny.db.algebra.AlgCollation; +import org.polypheny.db.algebra.AlgCollations; +import org.polypheny.db.algebra.AlgDistribution; +import org.polypheny.db.algebra.AlgFieldCollation; +import org.polypheny.db.algebra.AlgNode; import org.polypheny.db.algebra.constant.Kind; import org.polypheny.db.algebra.constant.SemiJoinType; -import org.polypheny.db.algebra.core.*; +import org.polypheny.db.algebra.core.Aggregate; +import org.polypheny.db.algebra.core.AggregateCall; +import org.polypheny.db.algebra.core.AlgFactories; import org.polypheny.db.algebra.core.AlgFactories.ScanFactory; +import org.polypheny.db.algebra.core.CorrelationId; +import org.polypheny.db.algebra.core.Filter; +import org.polypheny.db.algebra.core.Intersect; +import org.polypheny.db.algebra.core.Join; +import org.polypheny.db.algebra.core.JoinAlgType; +import org.polypheny.db.algebra.core.Match; +import org.polypheny.db.algebra.core.Minus; +import org.polypheny.db.algebra.core.Modify.Operation; +import org.polypheny.db.algebra.core.Project; +import org.polypheny.db.algebra.core.Scan; +import org.polypheny.db.algebra.core.SemiJoin; +import org.polypheny.db.algebra.core.Sort; +import org.polypheny.db.algebra.core.Union; +import org.polypheny.db.algebra.core.Values; import org.polypheny.db.algebra.fun.AggFunction; +import org.polypheny.db.algebra.logical.document.LogicalDocumentModify; import org.polypheny.db.algebra.logical.document.LogicalDocumentProject; import org.polypheny.db.algebra.logical.document.LogicalDocumentScan; +import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; import org.polypheny.db.algebra.logical.lpg.LogicalGraph; import org.polypheny.db.algebra.logical.lpg.LogicalLpgMatch; import org.polypheny.db.algebra.logical.lpg.LogicalLpgProject; @@ -63,8 +110,24 @@ import org.polypheny.db.languages.OperatorRegistry; import org.polypheny.db.languages.QueryLanguage; import org.polypheny.db.nodes.Operator; -import org.polypheny.db.plan.*; -import org.polypheny.db.rex.*; +import org.polypheny.db.plan.AlgOptCluster; +import org.polypheny.db.plan.AlgOptPredicateList; +import org.polypheny.db.plan.AlgOptSchema; +import org.polypheny.db.plan.AlgOptTable; +import org.polypheny.db.plan.AlgOptUtil; +import org.polypheny.db.plan.Context; +import org.polypheny.db.plan.Contexts; +import org.polypheny.db.prepare.PolyphenyDbCatalogReader; +import org.polypheny.db.rex.RexBuilder; +import org.polypheny.db.rex.RexCall; +import org.polypheny.db.rex.RexCorrelVariable; +import org.polypheny.db.rex.RexExecutor; +import org.polypheny.db.rex.RexInputRef; +import org.polypheny.db.rex.RexLiteral; +import org.polypheny.db.rex.RexNode; +import org.polypheny.db.rex.RexShuttle; +import org.polypheny.db.rex.RexSimplify; +import org.polypheny.db.rex.RexUtil; import org.polypheny.db.runtime.Hook; import org.polypheny.db.runtime.PolyCollections.PolyDictionary; import org.polypheny.db.schema.ModelTrait; @@ -72,17 +135,21 @@ import org.polypheny.db.schema.graph.PolyNode; import org.polypheny.db.transaction.Statement; import org.polypheny.db.type.PolyType; -import org.polypheny.db.util.*; +import org.polypheny.db.util.DateString; +import org.polypheny.db.util.Holder; +import org.polypheny.db.util.ImmutableBitSet; +import org.polypheny.db.util.ImmutableIntList; +import org.polypheny.db.util.ImmutableNullableList; +import org.polypheny.db.util.Litmus; +import org.polypheny.db.util.NlsString; +import org.polypheny.db.util.Pair; +import org.polypheny.db.util.TimeString; +import org.polypheny.db.util.TimestampString; +import org.polypheny.db.util.Util; +import org.polypheny.db.util.ValidatorUtil; import org.polypheny.db.util.mapping.Mapping; import org.polypheny.db.util.mapping.Mappings; -import javax.annotation.Nonnull; -import java.math.BigDecimal; -import java.util.*; -import java.util.stream.Collectors; - -import static org.polypheny.db.util.Static.RESOURCE; - /** * Builder for relational expressions. @@ -230,6 +297,12 @@ public static AlgBuilder create( Statement statement ) { return create( statement, cluster ); } + public static AlgBuilder createDocumentBuilder( Statement statement ) { + final RexBuilder rexBuilder = new RexBuilder( statement.getTransaction().getTypeFactory() ); + final AlgOptCluster cluster = AlgOptCluster.createDocument( statement.getQueryProcessor().getPlanner(), rexBuilder ); + return create( statement, cluster ); + } + public static AlgBuilder create( Statement statement, AlgOptCluster cluster ) { return new AlgBuilder( Contexts.EMPTY_CONTEXT, cluster, statement.getTransaction().getCatalogReader() ); @@ -2406,7 +2479,51 @@ public AlgBuilder permute( Mapping mapping ) { public AlgBuilder aggregate( GroupKey groupKey, List aggregateCalls ) { - return aggregate( groupKey, Lists.transform( aggregateCalls, AggCallImpl2::new ) ); + return aggregate( groupKey, aggregateCalls.stream().map( AggCallImpl2::new ).collect( Collectors.toList() ) ); + } + + // DOCUMENT SCAN + + + public AlgBuilder docScan( Statement statement, String collectionName ) { + PolyphenyDbCatalogReader catalogReader = statement.getTransaction().getCatalogReader(); + AlgOptTable collection = catalogReader.getCollection( List.of( collectionName.split( "\\." ) ) ); + stack.push( new Frame( LogicalDocumentScan.create( cluster, collection ) ) ); + return this; + } + + + // DOCUMENT MODIFY + public AlgBuilder docInsert( Statement statement, String collectionName, BsonDocument... documents ) { + PolyphenyDbCatalogReader catalogReader = statement.getTransaction().getCatalogReader(); + AlgOptTable collection = catalogReader.getCollection( List.of( collectionName.split( "\\." ) ) ); + + AlgNode values = LogicalDocumentValues.create( cluster, ImmutableList.copyOf( documents ) ); + + LogicalDocumentModify modify = LogicalDocumentModify.create( collection, values, catalogReader, Operation.INSERT, null, null ); + + stack.push( new Frame( modify ) ); + return this; + } + + + public AlgBuilder docInsert( Statement statement, String collectionName, BsonElement... elements ) { + return docInsert( statement, collectionName, new BsonDocument( List.of( elements ) ) ); + } + + + public AlgBuilder docInsert( Statement statement, String collectionName, String key, BsonValue value ) { + return docInsert( statement, collectionName, new BsonDocument( key, value ) ); + } + + + public AlgBuilder docInsert( Statement statement, String collectionName, String key0, BsonValue value0, String key1, BsonValue value1 ) { + return docInsert( statement, collectionName, new BsonElement( key0, value0 ), new BsonElement( key1, value1 ) ); + } + + + public AlgBuilder docInsert( Statement statement, String collectionName, String key0, BsonValue value0, String key1, BsonValue value1, String key2, BsonValue value2 ) { + return docInsert( statement, collectionName, new BsonElement( key0, value0 ), new BsonElement( key1, value1 ), new BsonElement( key2, value2 ) ); } diff --git a/core/src/main/java/org/polypheny/db/transaction/Statement.java b/core/src/main/java/org/polypheny/db/transaction/Statement.java index 7f88d9f958..3704b02c01 100644 --- a/core/src/main/java/org/polypheny/db/transaction/Statement.java +++ b/core/src/main/java/org/polypheny/db/transaction/Statement.java @@ -21,6 +21,7 @@ import org.polypheny.db.prepare.Context; import org.polypheny.db.monitoring.events.StatementEvent; import org.polypheny.db.processing.QueryProcessor; +import org.polypheny.db.stream.StreamProcessor; import org.polypheny.db.util.FileInputHandle; public interface Statement { @@ -29,6 +30,8 @@ public interface Statement { QueryProcessor getQueryProcessor(); + StreamProcessor getStreamProcessor(String message); + DataContext getDataContext(); Context getPrepareContext(); diff --git a/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java b/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java index 885ffae660..e44764358a 100644 --- a/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java +++ b/core/src/test/java/org/polypheny/db/catalog/MockCatalog.java @@ -893,6 +893,12 @@ public void deleteQueryInterface( int ifaceId ) { } + @Override + public void updateQueryInterfaceSettings( int queryInterfaceId, Map newSettings ) { + throw new NotImplementedException(); + } + + @Override public long addPartitionGroup( long tableId, String partitionGroupName, long schemaId, PartitionType partitionType, long numberOfInternalPartitions, List effectivePartitionGroupQualifier, boolean isUnbound ) throws GenericCatalogException { throw new NotImplementedException(); diff --git a/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java b/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java index 93b549f3b7..25ae8c266e 100644 --- a/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java +++ b/dbms/src/main/java/org/polypheny/db/routing/routers/AbstractDqlRouter.java @@ -37,6 +37,7 @@ import org.polypheny.db.algebra.core.lpg.LpgAlg.NodeType; import org.polypheny.db.algebra.logical.common.LogicalTransformer; import org.polypheny.db.algebra.logical.document.LogicalDocumentScan; +import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; import org.polypheny.db.algebra.logical.lpg.LogicalLpgScan; import org.polypheny.db.algebra.logical.relational.LogicalModify; import org.polypheny.db.algebra.logical.relational.LogicalScan; @@ -172,6 +173,7 @@ public AlgNode routeDocument( RoutedAlgBuilder builder.push( handleDocumentScan( (DocumentScan) alg, statement, builder, null ).build() ); return alg; } else if ( alg.getDocType() == DocType.VALUES ) { + builder.push( handleDocuments( (LogicalDocumentValues) alg, builder ).build() ); return alg; } throw new UnsupportedOperationException(); diff --git a/dbms/src/main/java/org/polypheny/db/schema/PolySchemaBuilder.java b/dbms/src/main/java/org/polypheny/db/schema/PolySchemaBuilder.java index 5a09a8b0b6..b677adc0fa 100644 --- a/dbms/src/main/java/org/polypheny/db/schema/PolySchemaBuilder.java +++ b/dbms/src/main/java/org/polypheny/db/schema/PolySchemaBuilder.java @@ -32,6 +32,7 @@ import org.polypheny.db.adapter.Adapter; import org.polypheny.db.adapter.AdapterManager; import org.polypheny.db.adapter.DataContext; +import org.polypheny.db.adapter.java.AbstractQueryableTable; import org.polypheny.db.algebra.logical.lpg.LogicalGraph; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.algebra.type.AlgDataTypeFactory; diff --git a/dbms/src/main/java/org/polypheny/db/stream/StreamProcessorImpl.java b/dbms/src/main/java/org/polypheny/db/stream/StreamProcessorImpl.java new file mode 100644 index 0000000000..1cfc4fdcda --- /dev/null +++ b/dbms/src/main/java/org/polypheny/db/stream/StreamProcessorImpl.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.stream; + +import static org.reflections.Reflections.log; + +import java.util.List; +import org.polypheny.db.PolyImplementation; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.TransactionException; + +public class StreamProcessorImpl implements StreamProcessor { + + String stream; + + + public StreamProcessorImpl( String stream ) { + this.stream = stream; + } + + + @Override + public String getStream() { + return stream; + } + + + public boolean isNumber( String value ) { + try { + Double.parseDouble( value ); + } catch ( NumberFormatException e ) { + return false; + } + return true; + } + + + public boolean isBoolean( String value ) { + return value.equals( "true" ) || value.equals( "false" ); + } + + + protected List> executeAndTransformPolyAlg( AlgRoot algRoot, Statement statement ) { + + try { + PolyImplementation result = statement.getQueryProcessor().prepareQuery( algRoot, false ); + log.debug( "AlgRoot was prepared." ); + List> rows = result.getRows( statement, -1 ); + statement.getTransaction().commit(); + return rows; + } catch ( Throwable e ) { + log.error( "Error during execution of stream processor query", e ); + try { + statement.getTransaction().rollback(); + } catch ( TransactionException transactionException ) { + log.error( "Could not rollback", e ); + } + return null; + } + } + +} diff --git a/dbms/src/main/java/org/polypheny/db/transaction/StatementImpl.java b/dbms/src/main/java/org/polypheny/db/transaction/StatementImpl.java index 4ff6444b05..50b8650d61 100644 --- a/dbms/src/main/java/org/polypheny/db/transaction/StatementImpl.java +++ b/dbms/src/main/java/org/polypheny/db/transaction/StatementImpl.java @@ -35,6 +35,8 @@ import org.polypheny.db.processing.QueryProcessor; import org.polypheny.db.processing.QueryProviderImpl; import org.polypheny.db.processing.VolcanoQueryProcessor; +import org.polypheny.db.stream.StreamProcessor; +import org.polypheny.db.stream.StreamProcessorImpl; import org.polypheny.db.util.FileInputHandle; public class StatementImpl implements Statement { @@ -73,6 +75,12 @@ public QueryProcessor getQueryProcessor() { } + @Override + public StreamProcessor getStreamProcessor(String message) { + return new StreamProcessorImpl(message); + } + + @Override public DataContext getDataContext() { if ( dataContext == null ) { diff --git a/gradle.properties b/gradle.properties index 9550469503..5579dfc39e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -69,6 +69,7 @@ h2_version = 1.4.197 hadoop_client_version = 2.7.5 hadoop_common_version = 2.7.5 hamcrest_core_version = 1.3 +hivemq_mqttclient_version = 1.3.1 hsqldb_version = 2.6.1 httpclient_version = 4.5.6 httpcore_version = 4.4.10 diff --git a/plugins/cypher-language/src/main/java/org/polypheny/db/cypher/parser/CypherCharStream.java b/plugins/cypher-language/src/main/java/org/polypheny/db/cypher/parser/CypherCharStream.java index 0db4857162..6805c3b6a2 100644 --- a/plugins/cypher-language/src/main/java/org/polypheny/db/cypher/parser/CypherCharStream.java +++ b/plugins/cypher-language/src/main/java/org/polypheny/db/cypher/parser/CypherCharStream.java @@ -170,7 +170,7 @@ private char convertUnicode( char c ) { hexval( nextQueryChar() )); } catch ( final IOException e ) { throw new RuntimeException( e.getMessage() ); - //throw new RuntimeException( e.getMessage(), queryCursor, queryCursorLine, queryCursorColumn ); + //throw new RuntimeException( e.getData(), queryCursor, queryCursorLine, queryCursorColumn ); } } diff --git a/plugins/mapdb-catalog/src/main/java/org/polypheny/db/catalog/CatalogImpl.java b/plugins/mapdb-catalog/src/main/java/org/polypheny/db/catalog/CatalogImpl.java index 47d23dc131..7bd5c33a95 100644 --- a/plugins/mapdb-catalog/src/main/java/org/polypheny/db/catalog/CatalogImpl.java +++ b/plugins/mapdb-catalog/src/main/java/org/polypheny/db/catalog/CatalogImpl.java @@ -2420,7 +2420,7 @@ public long addCollection( Long id, String name, long schemaId, int currentUserI collectionId, name, List.of(), - EntityType.ENTITY, + entity, null ); synchronized ( this ) { @@ -4014,6 +4014,21 @@ public void deleteQueryInterface( int ifaceId ) { } } + /** + * {@inheritDoc} + */ + public void updateQueryInterfaceSettings( int queryInterfaceId, Map newSettings ) { + CatalogQueryInterface old = getQueryInterface( queryInterfaceId ); + Map temp = new HashMap<>(); + newSettings.forEach( temp::put ); + CatalogQueryInterface queryInterface = new CatalogQueryInterface( old.id, old.name, old.clazz, temp ); + synchronized ( this ) { + queryInterfaces.put( queryInterface.id, queryInterface ); + queryInterfaceNames.put( queryInterface.name, queryInterface ); + } + listeners.firePropertyChange( "queryInterface", old, queryInterface ); + } + /** * {@inheritDoc} diff --git a/plugins/mql-language/src/main/java/org/polypheny/db/languages/mql2alg/MqlToAlgConverter.java b/plugins/mql-language/src/main/java/org/polypheny/db/languages/mql2alg/MqlToAlgConverter.java index 50a0fb262b..6e17253cc5 100644 --- a/plugins/mql-language/src/main/java/org/polypheny/db/languages/mql2alg/MqlToAlgConverter.java +++ b/plugins/mql-language/src/main/java/org/polypheny/db/languages/mql2alg/MqlToAlgConverter.java @@ -29,6 +29,7 @@ import java.util.Objects; import java.util.function.BiFunction; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.Getter; import org.bson.BsonArray; import org.bson.BsonBoolean; @@ -217,6 +218,7 @@ public class MqlToAlgConverter { operators.add( "$all" ); operators.add( "$elemMatch" ); operators.add( "$size" ); + operators.add( "$$ROOT" ); } @@ -260,7 +262,8 @@ public AlgRoot convert( Node query, QueryParameters parameters ) { this.parameters = (MqlQueryParameters) parameters; this.defaultDatabase = ((MqlQueryParameters) parameters).getDatabase(); if ( query instanceof MqlCollectionStatement ) { - return convert( (MqlCollectionStatement) query ); + AlgNode scanNode = createScan( (MqlCollectionStatement) query ); + return convert( (MqlCollectionStatement) query, scanNode ); } throw new RuntimeException( "DML or DQL need a collection" ); } @@ -270,25 +273,15 @@ public AlgRoot convert( Node query, QueryParameters parameters ) { * Converts the initial MongoQl by stepping through it iteratively * * @param query the query in MqlNode format + * @param input the input that should be queried * @return the {@link AlgNode} format of the initial query */ - public AlgRoot convert( MqlCollectionStatement query ) { + public AlgRoot convert( MqlCollectionStatement query, AlgNode input ) { Type kind = query.getMqlKind(); - this.entity = getEntity( query, defaultDatabase ); - if ( entity == null ) { - throw new RuntimeException( "The used collection does not exist." ); - } - - AlgNode node; - if ( entity.getTable().getSchemaType() == NamespaceType.RELATIONAL ) { - _dataExists = false; - } - - node = LogicalDocumentScan.create( cluster, entity ); this.usesDocumentModel = true; - AlgDataType rowType = entity.getRowType(); + AlgDataType rowType = input.getRowType(); this.builder = new RexBuilder( cluster.getTypeFactory() ); @@ -296,15 +289,15 @@ public AlgRoot convert( MqlCollectionStatement query ) { switch ( kind ) { case FIND: - AlgNode find = convertFind( (MqlFind) query, rowType, node ); + AlgNode find = convertFind( (MqlFind) query, rowType, input ); root = AlgRoot.of( find, find.getRowType(), Kind.SELECT ); break; case COUNT: - AlgNode count = convertCount( (MqlCount) query, rowType, node ); + AlgNode count = convertCount( (MqlCount) query, rowType, input ); root = AlgRoot.of( count, count.getRowType(), Kind.SELECT ); break; case AGGREGATE: - AlgNode aggregate = convertAggregate( (MqlAggregate) query, rowType, node ); + AlgNode aggregate = convertAggregate( (MqlAggregate) query, rowType, input ); root = AlgRoot.of( aggregate, Kind.SELECT ); break; /// dmls @@ -313,10 +306,10 @@ public AlgRoot convert( MqlCollectionStatement query ) { break; case DELETE: case FIND_DELETE: - root = AlgRoot.of( convertDelete( (MqlDelete) query, entity, node ), Kind.DELETE ); + root = AlgRoot.of( convertDelete( (MqlDelete) query, entity, input ), Kind.DELETE ); break; case UPDATE: - root = AlgRoot.of( convertUpdate( (MqlUpdate) query, entity, node ), Kind.UPDATE ); + root = AlgRoot.of( convertUpdate( (MqlUpdate) query, entity, input ), Kind.UPDATE ); break; default: throw new IllegalStateException( "Unexpected value: " + kind ); @@ -328,6 +321,20 @@ public AlgRoot convert( MqlCollectionStatement query ) { } + private AlgNode createScan( MqlCollectionStatement query ) { + this.entity = getEntity( query, defaultDatabase ); + if ( entity == null ) { + throw new RuntimeException( "The used collection does not exist." ); + } + + if ( entity.getTable().getSchemaType() == NamespaceType.RELATIONAL ) { + _dataExists = false; + } + + return LogicalDocumentScan.create( cluster, entity ); + } + + private AlgOptTable getEntity( MqlCollectionStatement query, String dbSchemaName ) { List names = ImmutableList.of( dbSchemaName, query.getCollection() ); @@ -1479,6 +1486,8 @@ private RexNode convertEntry( String key, String parentKey, BsonValue bsonValue, return convertElemMatch( bsonValue, parentKey, rowType ); } else if ( key.equals( "$size" ) ) { return convertSize( bsonValue, parentKey, rowType ); + } else if ( key.equals( "$$ROOT" ) ) { + return convertField( parentKey == null ? key : parentKey + "." + key, bsonValue, rowType ); } return translateLogical( key, parentKey, bsonValue, rowType ); } diff --git a/plugins/mqtt-stream/build.gradle b/plugins/mqtt-stream/build.gradle new file mode 100644 index 0000000000..31e4f4450b --- /dev/null +++ b/plugins/mqtt-stream/build.gradle @@ -0,0 +1,105 @@ +group "org.polypheny" + +configurations { + tests { + extendsFrom testRuntimeOnly + } +} + +dependencies { + compileOnly project(":core") + compileOnly project(":dbms") + compileOnly project(":monitoring") + compileOnly project(":plugins:mql-language") + + // https://mvnrepository.com/artifact/com.hivemq/hivemq-mqtt-client + implementation group: 'com.hivemq', name: 'hivemq-mqtt-client', version: hivemq_mqttclient_version + + implementation group: "org.mongodb", name: "mongodb-driver-sync", version: mongodb_driver_sync_version // Apache 2.0 + + // --- Test Compile --- + testImplementation project(path: ":dbms", configuration: "test") + testImplementation project(path: ":dbms") + testImplementation project(path: ":core", configuration: "tests") + testImplementation project(path: ":core") + testImplementation project(":plugins:mql-language") + testCompileOnly group: 'org.pf4j', name: 'pf4j', version: pf4jVersion + testImplementation group: "junit", name: "junit", version: junit_version + testImplementation group: "org.mockito", name: "mockito-core", version: mockito_core_version + + // --- Test Compile --- + +} + + +sourceSets { + main { + java { + srcDirs = ["src/main/java", "build/generated-sources"] + outputDir = file(project.buildDir.absolutePath + "/classes") + } + resources { + srcDirs = ["src/main/resources"] + exclude "version/*.properties" + } + output.resourcesDir = file(project.buildDir.absolutePath + "/classes") + } + test { + java { + srcDirs = ["src/test/java"] + outputDir = file(project.buildDir.absolutePath + "/test-classes") + } + resources { + // We need the main resources for the tests as well. + srcDirs = ["src/test/resources", "src/main/resources"] + } + output.resourcesDir = file(project.buildDir.absolutePath + "/test-classes") + } +} + +task mqttTests(type: Test) { + description = 'Runs MQTT client tests.' + group = 'verification' + useJUnit { + includeCategories 'org.polypheny.db.mqtt.MqttClientBrokerTest' + } + shouldRunAfter(tasks.named('test')) +} +mqttTests.dependsOn(testClasses) + +compileJava { + dependsOn(":config:processResources") + dependsOn(":core:processResources") + dependsOn(":information:processResources") + dependsOn(":monitoring:processResources") + dependsOn(":plugins:mql-language:processResources") +} + +delombok { + dependsOn(":monitoring:processResources") +} + + +/** + * JARs + */ +jar { + manifest { + attributes "Manifest-Version": "1.0" + attributes "Copyright": "The Polypheny Project (polypheny.org)" + attributes "Version": "$project.version" + } +} + +java { + withJavadocJar(); + withSourcesJar(); +} + +licensee { + allow('Apache-2.0') + allowUrl('http://www.apache.org/license/LICENSE-2.0.txt') + allow('MIT-0') + + +} diff --git a/plugins/mqtt-stream/gradle.properties b/plugins/mqtt-stream/gradle.properties new file mode 100644 index 0000000000..888b2d1806 --- /dev/null +++ b/plugins/mqtt-stream/gradle.properties @@ -0,0 +1,27 @@ +# +# Copyright 2019-2023 The Polypheny Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pluginVersion = 0.0.1 + +pluginId = mqtt-stream +pluginClass = org.polypheny.db.mqtt.MqttStreamPlugin +pluginProvider = The Polypheny Project +pluginDependencies = mql-language +pluginUrlPath = +pluginCategories = interface +pluginPolyDependencies = +pluginIsSystemComponent = false +pluginIsUiVisible = true \ No newline at end of file diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/FilteringMqttMessage.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/FilteringMqttMessage.java new file mode 100644 index 0000000000..474f45d229 --- /dev/null +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/FilteringMqttMessage.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import lombok.Getter; + +public class FilteringMqttMessage { + + private MqttMessage mqttMessage; + @Getter + private String query; + + + public FilteringMqttMessage( MqttMessage mqttMessage, String query ) { + this.mqttMessage = mqttMessage; + this.query = query; + } + + + public String getMessage() { + return mqttMessage.getData(); + } + +} diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttMessage.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttMessage.java new file mode 100644 index 0000000000..614d6f1aff --- /dev/null +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttMessage.java @@ -0,0 +1,40 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import lombok.Getter; +import org.polypheny.db.stream.StreamMessage; + +public class MqttMessage implements StreamMessage { + + + final String payload; + @Getter + final String topic; + + + public MqttMessage( String payload, String topic ) { + this.payload = payload; + this.topic = topic; + } + + + public String getData() { + return this.payload; + } + +} diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java new file mode 100644 index 0000000000..627b74312b --- /dev/null +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamPlugin.java @@ -0,0 +1,833 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + + +import com.google.common.collect.ImmutableList; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Stack; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Extension; +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; +import org.polypheny.db.adapter.DataStore; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.Catalog.NamespaceType; +import org.polypheny.db.catalog.Catalog.Pattern; +import org.polypheny.db.catalog.Catalog.PlacementType; +import org.polypheny.db.catalog.entity.CatalogCollection; +import org.polypheny.db.catalog.entity.CatalogSchema; +import org.polypheny.db.catalog.exceptions.EntityAlreadyExistsException; +import org.polypheny.db.catalog.exceptions.GenericCatalogException; +import org.polypheny.db.catalog.exceptions.NoTablePrimaryKeyException; +import org.polypheny.db.catalog.exceptions.UnknownDatabaseException; +import org.polypheny.db.catalog.exceptions.UnknownSchemaException; +import org.polypheny.db.catalog.exceptions.UnknownSchemaIdRuntimeException; +import org.polypheny.db.catalog.exceptions.UnknownUserException; +import org.polypheny.db.ddl.DdlManager; +import org.polypheny.db.iface.Authenticator; +import org.polypheny.db.iface.QueryInterface; +import org.polypheny.db.iface.QueryInterfaceManager; +import org.polypheny.db.information.InformationAction; +import org.polypheny.db.information.InformationGroup; +import org.polypheny.db.information.InformationKeyValue; +import org.polypheny.db.information.InformationManager; +import org.polypheny.db.information.InformationPage; +import org.polypheny.db.information.InformationTable; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.transaction.TransactionException; +import org.polypheny.db.transaction.TransactionManager; + + +public class MqttStreamPlugin extends Plugin { + + + /** + * Constructor to be used by plugin manager for plugin instantiation. + * Your plugins have to provide constructor with this exact signature to be successfully loaded by manager. + */ + public MqttStreamPlugin( PluginWrapper wrapper ) { + super( wrapper ); + } + + + @Override + public void start() { + // Add MQTT stream + Map mqttDefaultSettings = new HashMap<>(); + mqttDefaultSettings.put( "brokerAddress", "localhost" ); + mqttDefaultSettings.put( "brokerPort", "1883" ); + mqttDefaultSettings.put( "Tsl/SslConnection", "false" ); + mqttDefaultSettings.put( "namespace", "default" ); + mqttDefaultSettings.put( "namespaceType", "DOCUMENT" ); + mqttDefaultSettings.put( "catchAllEntity", "false" ); + mqttDefaultSettings.put( "catchAllEntityName", "" ); + mqttDefaultSettings.put( "Query Interface Name", "mqtt" ); + QueryInterfaceManager.addInterfaceType( "mqtt", MqttStreamClient.class, mqttDefaultSettings ); + } + + + @Override + public void stop() { + QueryInterfaceManager.removeInterfaceType( MqttStreamClient.class ); + } + + + @Slf4j + @Extension + public static class MqttStreamClient extends QueryInterface { + + @SuppressWarnings("WeakerAccess") + public static final String INTERFACE_NAME = "MQTT Interface"; + @SuppressWarnings("WeakerAccess") + public static final String INTERFACE_DESCRIPTION = "Connection establishment to a MQTT broker."; + @SuppressWarnings("WeakerAccess") + public static final List AVAILABLE_SETTINGS = ImmutableList.of( + new QueryInterfaceSettingString( "brokerAddress", false, true, false, null ), + new QueryInterfaceSettingInteger( "brokerPort", false, true, false, null ), + new QueryInterfaceSettingString( "namespace", false, true, true, null ), + // "RELATIONAL", "GRAPH" types are not supported yet. + new QueryInterfaceSettingList( "namespaceType", false, true, false, + new ArrayList<>( List.of( "DOCUMENT" ) ) ), + new QueryInterfaceSettingList( "catchAllEntity", false, true, true, new ArrayList<>( List.of( "TRUE", "FALSE" ) ) ), + new QueryInterfaceSettingString( "catchAllEntityName", true, false, true, null ), + new QueryInterfaceSettingString( "topics", false, true, true, null ), + new QueryInterfaceSettingString( "filterQuery", true, false, true, "" ) ); + + @Getter + private final String brokerAddress; + @Getter + private final int brokerPort; + /** + * topicsMap: Contains all subscribed topics as the key and the received number of messages with this topic. + */ + @Getter + private Map topicsMap = new ConcurrentHashMap<>(); + /** + * filterMap: Contains the filter query for a topic. The key is the topic. + */ + @Getter + private Map filterMap = new ConcurrentHashMap<>(); + @Getter + private ConcurrentLinkedQueue messageQueue = new ConcurrentLinkedQueue<>(); + private Mqtt5AsyncClient client; + @Getter + private String namespaceName; + @Getter + private NamespaceType namespaceType; + @Getter + private AtomicBoolean catchAllEntity; + @Getter + private String catchAllEntityName; + private final long databaseId; + private final int userId; + private final Object settingsLock = new Object(); + private final MonitoringPage monitoringPage; + + + public MqttStreamClient( TransactionManager transactionManager, Authenticator authenticator, int ifaceId, String uniqueName, Map settings ) { + super( transactionManager, authenticator, ifaceId, uniqueName, settings, true, false ); + // Add information page + this.monitoringPage = new MonitoringPage(); + this.brokerAddress = settings.get( "brokerAddress" ).trim(); + this.brokerPort = Integer.parseInt( settings.get( "brokerPort" ).trim() ); + this.databaseId = Catalog.defaultDatabaseId; + this.userId = Catalog.defaultUserId; + + String name = settings.get( "namespace" ).trim(); + NamespaceType type = NamespaceType.valueOf( settings.get( "namespaceType" ) ); + if ( type != NamespaceType.DOCUMENT ) { + throw new RuntimeException( "Namespace types other than the DOCUMENT type are not yet supported." ); + } + if ( !namespaceExists( name, type ) ) { + createNamespace( name, type ); + } + this.namespaceName = name; + this.namespaceType = type; + + this.catchAllEntity = new AtomicBoolean( Boolean.parseBoolean( settings.get( "catchAllEntity" ) ) ); + this.catchAllEntityName = settings.get( "catchAllEntityName" ) == null ? + settings.get( "catchAllEntityName" ) : settings.get( "catchAllEntityName" ) + .trim() + .replace( '#', '_' ) + .replace( '+', '_' ) + .replace( '/', '_' ); + if ( this.catchAllEntity.get() ) { + if ( this.catchAllEntityName == null || this.catchAllEntityName.isEmpty() || this.catchAllEntityName.isBlank() ) { + throw new NullPointerException( "catchAllEntity is set to true but no valid entity name was given! Please enter a entity name." ); + } else if ( !entityExists( this.catchAllEntityName ) ) { + createEntity( this.catchAllEntityName ); + } + } else if ( settings.get( "topics" ) != null ) { + for ( String topic : toList( settings.get( "topics" ) ) ) { + topic = topic.replace( '#', '_' ) + .replace( '+', '_' ) + .replace( '/', '_' ); + if ( !this.catchAllEntity.get() && !entityExists( topic ) ) { + createEntity( topic ); + } + } + } + String queryString = settings.get( "filterQuery" ); + if ( queryString != null && !queryString.isBlank() ) { + saveQueriesInMap( queryString ); + } + + } + + + @Override + public void run() { + + this.client = MqttClient.builder().useMqttVersion5() + .identifier( getUniqueName() ) + .serverHost( brokerAddress ) + .serverPort( brokerPort ) + .automaticReconnectWithDefaultConfig() + .buildAsync(); + + client.connectWith().send().whenComplete( ( connAck, throwable ) -> { + if ( throwable != null ) { + throw new RuntimeException( "Connection to broker could not be established. Try to reconnect with the 'Reconnect' button" + throwable ); + } else { + log.info( "{} started and is listening to broker on {}:{}", INTERFACE_NAME, brokerAddress, brokerPort ); + subscribe( toList( this.settings.get( "topics" ) ) ); + } + } ); + + } + + + @Override + public List getAvailableSettings() { + return AVAILABLE_SETTINGS; + } + + + @Override + public void shutdown() { + + client.disconnect().whenComplete( ( disconn, throwable ) -> { + if ( throwable != null ) { + throw new RuntimeException( INTERFACE_NAME + " could not disconnect from MQTT broker " + brokerAddress + ":" + brokerPort + ". Please try again.", throwable ); + } else { + log.info( "{} stopped.", INTERFACE_NAME ); + monitoringPage.remove(); + } + } ); + + } + + + private boolean namespaceExists( String namespaceName, NamespaceType namespaceType ) { + Catalog catalog = Catalog.getInstance(); + if ( catalog.checkIfExistsSchema( Catalog.defaultDatabaseId, namespaceName ) ) { + getExistingNamespaceId( namespaceName, namespaceType ); + return true; + } else { + return false; + } + } + + + private long getNamespaceId( String namespaceName, NamespaceType namespaceType ) { + Catalog catalog = Catalog.getInstance(); + if ( catalog.checkIfExistsSchema( Catalog.defaultDatabaseId, namespaceName ) ) { + return getExistingNamespaceId( namespaceName, namespaceType ); + } else { + return createNamespace( namespaceName, namespaceType ); + } + } + + + private long getExistingNamespaceId( String namespaceName, NamespaceType namespaceType ) { + Catalog catalog = Catalog.getInstance(); + CatalogSchema schema; + try { + schema = catalog.getSchema( Catalog.defaultDatabaseId, namespaceName ); + } catch ( UnknownSchemaException e ) { + throw new RuntimeException( e ); + } + assert schema != null; + if ( schema.namespaceType == namespaceType ) { + return schema.id; + } else { + throw new RuntimeException( "There is already a namespace existing in this database with the given name but of another type. Please change the namespace name or the type." ); + } + } + + + private long createNamespace( String namespaceName, NamespaceType namespaceType ) { + Catalog catalog = Catalog.getInstance(); + long id = catalog.addNamespace( namespaceName, Catalog.defaultDatabaseId, Catalog.defaultUserId, namespaceType ); + try { + catalog.commit(); + return id; + } catch ( NoTablePrimaryKeyException e ) { + throw new RuntimeException( e ); + } + } + + + @Override + protected void reloadSettings( List updatedSettings ) { + synchronized ( settingsLock ) { + + for ( String changedSetting : updatedSettings ) { + switch ( changedSetting ) { + case "topics": + List newTopicsList = toList( this.getCurrentSettings().get( "topics" ) ); + List topicsToSub = new ArrayList<>(); + for ( String newTopic : newTopicsList ) { + if ( !topicsMap.containsKey( newTopic ) ) { + topicsToSub.add( newTopic ); + } + } + if ( !topicsToSub.isEmpty() ) { + subscribe( topicsToSub ); + } + List topicsToUnsub = new ArrayList<>(); + for ( String oldTopic : topicsMap.keySet() ) { + if ( !newTopicsList.contains( oldTopic ) ) { + topicsToUnsub.add( oldTopic ); + } + } + if ( !topicsToUnsub.isEmpty() ) { + unsubscribe( topicsToUnsub ); + } + break; + + case "namespace": + String newNamespaceName = this.getCurrentSettings().get( "namespace" ).trim(); + if ( updatedSettings.contains( "namespaceType" ) ) { + if ( updatedSettings.indexOf( "namespaceType" ) < updatedSettings.indexOf( "namespace" ) ) { + NamespaceType type = NamespaceType.valueOf( this.getCurrentSettings().get( "namespaceType" ) ); + try { + if ( !namespaceExists( newNamespaceName, type ) ) { + createNamespace( newNamespaceName, type ); + } + this.namespaceName = newNamespaceName; + this.namespaceType = type; + createAllEntities(); + } catch ( RuntimeException e ) { + this.settings.put( "namespace", this.namespaceName ); + this.settings.put( "namespaceType", String.valueOf( this.namespaceType ) ); + throw new RuntimeException( e ); + } + } // else checking for namespace happens in case "namespaceType" + } else { + try { + if ( !namespaceExists( newNamespaceName, this.namespaceType ) ) { + createNamespace( newNamespaceName, this.namespaceType ); + } + this.namespaceName = newNamespaceName; + createAllEntities(); + } catch ( RuntimeException e ) { + this.settings.put( "namespace", this.namespaceName ); + throw new RuntimeException( e ); + } + } + break; + case "namespaceType": + NamespaceType newNamespaceType = NamespaceType.valueOf( this.getCurrentSettings().get( "namespaceType" ) ); + if ( updatedSettings.contains( "namespace" ) ) { + if ( updatedSettings.indexOf( "namespace" ) < updatedSettings.indexOf( "namespaceType" ) ) { + String newName = this.getCurrentSettings().get( "namespace" ); + try { + if ( !namespaceExists( newName, newNamespaceType ) ) { + createNamespace( newName, newNamespaceType ); + } + this.namespaceName = newName; + this.namespaceType = newNamespaceType; + createAllEntities(); + } catch ( RuntimeException e ) { + this.settings.put( "namespace", this.namespaceName ); + this.settings.put( "namespaceType", String.valueOf( this.namespaceType ) ); + throw new RuntimeException( e ); + } + } // else checking for namespace happens in case "namespace" + } else { + try { + if ( !namespaceExists( this.namespaceName, newNamespaceType ) ) { + createNamespace( this.namespaceName, newNamespaceType ); + } + this.namespaceType = newNamespaceType; + createAllEntities(); + } catch ( RuntimeException e ) { + this.settings.put( "namespaceType", String.valueOf( this.namespaceType ) ); + throw new RuntimeException( e ); + } + } + break; + case "catchAllEntity": + this.catchAllEntity.set( Boolean.parseBoolean( this.getCurrentSettings().get( "catchAllEntity" ) ) ); + createAllEntities(); + break; + case "catchAllEntityName": + String newcatchAllEntityName = this.getCurrentSettings().get( "catchAllEntityName" ).trim(); + newcatchAllEntityName = newcatchAllEntityName == null ? null : newcatchAllEntityName.trim().replace( '#', '_' ).replace( '+', '_' ).replace( '/', '_' ); + boolean mode; + if ( updatedSettings.contains( "catchAllEntity" ) ) { + mode = Boolean.parseBoolean( this.getCurrentSettings().get( "catchAllEntity" ) ); + } else { + mode = this.catchAllEntity.get(); + } + if ( mode ) { + if ( !(newcatchAllEntityName.equals( "null" ) || newcatchAllEntityName.isEmpty() || newcatchAllEntityName.isBlank()) ) { + if ( !entityExists( newcatchAllEntityName ) ) { + createEntity( this.catchAllEntityName ); + } + this.catchAllEntityName = newcatchAllEntityName; + createAllEntities(); + } else { + this.settings.put( "catchAllEntityName", this.catchAllEntityName ); + throw new NullPointerException( "catchAllEntity is set to FALSE but no valid entity name was given! Please enter a entity name." ); + } + + } else { + this.catchAllEntityName = newcatchAllEntityName; + } + break; + case "filterQuery": + String queryString = this.getCurrentSettings().get( "filterQuery" ); + filterMap.clear(); + saveQueriesInMap( queryString ); + break; + } + } + } + } + + + protected void subscribe( List newTopics ) { + for ( String t : newTopics ) { + subscribe( t ); + } + } + + + /** + * subscribes to one given topic and adds it to the List topics. + * + * @param topic the topic the client should subscribe to. + */ + private void subscribe( String topic ) { + client.subscribeWith().topicFilter( topic ) + .callback( this::processMsg ) + .send() + .whenComplete( ( subAck, throwable ) -> { + if ( throwable != null ) { + throw new RuntimeException( String.format( "Subscription was not successful for topic \"%s\" . Please try again.", topic ), throwable ); + } else { + this.topicsMap.put( topic, new AtomicLong( 0 ) ); + } + } ); + } + + + protected void unsubscribe( List topics ) { + for ( String t : topics ) { + unsubscribe( t ); + } + } + + + private void unsubscribe( String topic ) { + client.unsubscribeWith().topicFilter( topic ).send().whenComplete( ( unsub, throwable ) -> { + if ( throwable != null ) { + synchronized ( settingsLock ) { + this.settings.put( "topics", this.settings.get( "topics" ) + "," + topic ); + } + throw new RuntimeException( "Topic " + topic + " could not be unsubscribed.", throwable ); + } else { + this.topicsMap.remove( topic ); + } + } ); + } + + + /** + * format of queries comma seperated: :, :, ... + * + * @param queries + */ + private void saveQueriesInMap( String queries ) { + Stack brackets = new Stack<>(); + String query; + while ( !queries.isBlank() ) { + int index = 0; + String topic = queries.substring( 0, queries.indexOf( ":" ) ); + queries = queries.substring( queries.indexOf( ":" ) + 1 ); + if ( topic.startsWith( "," ) || topic.startsWith( " ," ) ) { + topic = topic.replaceFirst( ",", "" ).trim(); + } + + while ( !queries.isBlank() ) { + char c = queries.charAt( index ); + if ( c == '{' ) { + brackets.push( c ); + index++; + } else if ( c == '}' ) { + if ( brackets.pop().equals( '{' ) ) { + if ( brackets.isEmpty() ) { + query = queries.substring( 0, index + 1 ).trim(); + if ( this.filterMap.containsKey( topic ) ) { + if ( !this.filterMap.get( topic ).equals( query ) ) { + this.filterMap.replace( topic, query ); + } + } else { + this.filterMap.put( topic, query ); + } + queries = queries.substring( index + 1 ); + break; + } + } else { + throw new RuntimeException( String.format( "The brackets in the query to the topic %s are not set correctly!", topic ) ); + } + } + if ( index < queries.toCharArray().length ) { + index++; + } + } + } + } + + + protected void processMsg( Mqtt5Publish subMsg ) { + Transaction transaction = getTransaction(); + Statement statement = transaction.createStatement(); + + String topic = subMsg.getTopic().toString(); + String message = extractPayload( subMsg ); + addMessageToQueue( topic, message ); + MqttMessage mqttMessage = new MqttMessage( message, topic ); + + String wildcardTopic = ""; + if ( !topicsMap.containsKey( topic ) ) { + wildcardTopic = getWildcardTopic( topic ); + topicsMap.get( wildcardTopic ).incrementAndGet(); + } else { + topicsMap.get( topic ).incrementAndGet(); + } + + if ( this.filterMap.containsKey( topic ) ) { + String filterQuery = this.filterMap.get( topic ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, statement ); + // false is returned when a message should not be stored in DB + if ( streamProcessor.applyFilter() ) { + insertInEntity( mqttMessage, transaction ); + } + } else if ( !wildcardTopic.isEmpty() && this.filterMap.containsKey( wildcardTopic ) ) { + String filterQuery = this.filterMap.get( wildcardTopic ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, statement ); + if ( streamProcessor.applyFilter() ) { + insertInEntity( mqttMessage, transaction ); + } + } else { + insertInEntity( mqttMessage, transaction ); + } + } + + + private void insertInEntity( MqttMessage mqttMessage, Transaction transaction ) { + StoringMqttMessage storingMqttMessage; + synchronized ( settingsLock ) { + if ( !this.catchAllEntity.get() ) { + String entityName; + entityName = mqttMessage.getTopic().replace( '#', '_' ).replace( '+', '_' ).replace( '/', '_' ); + storingMqttMessage = new StoringMqttMessage( mqttMessage, this.namespaceName, this.namespaceType, getUniqueName(), this.databaseId, this.userId, entityName ); + } else { + storingMqttMessage = new StoringMqttMessage( mqttMessage, this.namespaceName, this.namespaceType, getUniqueName(), this.databaseId, this.userId, this.catchAllEntityName ); + } + } + StreamCapture streamCapture = new StreamCapture( transaction ); + streamCapture.insert( storingMqttMessage ); + } + + + protected static String extractPayload( Mqtt5Publish subMsg ) { + return new String( subMsg.getPayloadAsBytes(), Charset.defaultCharset() ); + } + + + protected String getWildcardTopic( String topic ) { + for ( String t : topicsMap.keySet() ) { + //check for multilevel wildcard + if ( t.contains( "#" ) && topic.startsWith( t.substring( 0, t.indexOf( "#" ) ) ) ) { + return t; + + } + // check for single level wildcard + if ( t.contains( "+" ) && topic.startsWith( t.substring( 0, t.indexOf( "+" ) ) ) && topic.endsWith( t.substring( t.indexOf( "+" ) + 1 ) ) ) { + return t; + } + } + return topic; + } + + + protected void addMessageToQueue( String topic, String message ) { + if ( this.messageQueue.size() >= 20 ) { + this.messageQueue.poll(); + this.messageQueue.add( new String[]{ topic, message } ); + } else { + this.messageQueue.add( new String[]{ topic, message } ); + } + } + + + /** + * separates a string by commas and inserts the separated parts to a list. + * + * @param string List of Strings seperated by comma without brackets as a String (entry form UI) + * @return List of seperated string values + */ + protected List toList( String string ) { + List list = new ArrayList<>( List.of( string.split( "," ) ) ); + for ( int i = 0; i < list.size(); i++ ) { + String topic = list.get( i ).trim(); + if ( !topic.isBlank() || !topic.isEmpty() ) { + list.set( i, topic ); + } else { + list.remove( i ); + } + } + return list; + } + + + /** + * @param entityName + * @return true: entity already exists, false: entity does not exist. + */ + private boolean entityExists( String entityName ) { + if ( this.namespaceType == NamespaceType.DOCUMENT ) { + String collectionName = entityName.replace( '#', '_' ).replace( '+', '_' ).replace( '/', '_' ); + Catalog catalog = Catalog.getInstance(); + Pattern pattern = new Pattern( collectionName ); + List collectionList = null; + synchronized ( settingsLock ) { + collectionList = catalog.getCollections( getNamespaceId( this.namespaceName, this.namespaceType ), pattern ); + } + return !collectionList.isEmpty(); + } else { + // handle other namespace types + return false; + } + } + + + private void createEntity( String entityName ) { + if ( this.namespaceType == NamespaceType.DOCUMENT ) { + String collectionName = entityName.replace( '#', '_' ).replace( '+', '_' ).replace( '/', '_' ); + Transaction transaction = getTransaction(); + Statement statement = transaction.createStatement(); + long namespaceID; + synchronized ( settingsLock ) { + namespaceID = getNamespaceId( this.namespaceName, this.namespaceType ); + } + try { + List dataStores = new ArrayList<>(); + DdlManager.getInstance().createCollection( + namespaceID, + collectionName, + true, + dataStores.size() == 0 ? null : dataStores, + PlacementType.MANUAL, + statement ); + transaction.commit(); + } catch ( EntityAlreadyExistsException | TransactionException e ) { + throw new RuntimeException( "Error while creating a new collection:", e ); + } catch ( UnknownSchemaIdRuntimeException e3 ) { + throw new RuntimeException( "New collection could not be created.", e3 ); + } + } + } + + + private void createAllEntities() { + if ( this.namespaceType == NamespaceType.DOCUMENT ) { + synchronized ( settingsLock ) { + if ( !this.catchAllEntity.get() ) { + for ( String t : this.topicsMap.keySet() ) { + if ( !entityExists( t ) ) { + createEntity( t ); + } + } + } else { + if ( !(this.catchAllEntityName == null || this.catchAllEntityName.equals( "" ) || this.catchAllEntityName.isBlank()) ) { + if ( !entityExists( this.catchAllEntityName ) ) { + createEntity( this.catchAllEntityName ); + } + } else { + throw new NullPointerException( "catchAllEntity is set to 'true' but no valid entity name was given! Please enter a entity name." ); + } + } + } + } else { + // handle other namespace types + throw new RuntimeException("Other namespace types are not implemented yet"); + } + } + + + protected void publish( String topic, String payload ) { + client.publishWith() + .topic( topic ) + .payload( payload.getBytes() ) + .send(); + } + + + private Transaction getTransaction() { + try { + return transactionManager.startTransaction( this.userId, this.databaseId, false, "MQTT Stream" ); + } catch ( UnknownUserException | UnknownDatabaseException | UnknownSchemaException | GenericCatalogException e ) { + throw new RuntimeException( "Error while starting transaction", e ); + } + } + + + @Override + public void languageChange() { + + } + + + @Override + public String getInterfaceType() { + return INTERFACE_NAME; + } + + + private class MonitoringPage { + + private final InformationPage informationPage; + + private final InformationGroup informationGroupTopics; + private final InformationGroup informationGroupInfo; + private final InformationGroup informationGroupReceivedMessages; + private final InformationGroup informationGroupPub; + private final InformationTable topicsTable; + private final InformationTable messageTable; + private final InformationKeyValue brokerKv; + private final InformationAction msgButton; + + + public MonitoringPage() { + InformationManager im = InformationManager.getInstance(); + + informationPage = new InformationPage( getUniqueName(), INTERFACE_NAME ).setLabel( "Interfaces" ); + informationPage.setRefreshFunction( this::update ); + im.addPage( informationPage ); + + informationGroupInfo = new InformationGroup( informationPage, "Information" ).setOrder( 1 ); + im.addGroup( informationGroupInfo ); + brokerKv = new InformationKeyValue( informationGroupInfo ); + im.registerInformation( brokerKv ); + + informationGroupTopics = new InformationGroup( informationPage, "Subscribed Topics" ).setOrder( 2 ); + im.addGroup( informationGroupTopics ); + topicsTable = new InformationTable( informationGroupTopics, List.of( "Topic", "Number of received messages" ) ); + im.registerInformation( topicsTable ); + + informationGroupReceivedMessages = new InformationGroup( informationPage, "Recently received messages" ).setOrder( 2 ); + im.addGroup( informationGroupReceivedMessages ); + messageTable = new InformationTable( informationGroupReceivedMessages, List.of( "Topic", "Message" ) ); + im.registerInformation( messageTable ); + + informationGroupPub = new InformationGroup( informationPage, "Publish a message" ).setOrder( 3 ); + im.addGroup( informationGroupPub ); + msgButton = new InformationAction( informationGroupPub, "Publish", ( parameters ) -> { + String end = "Message was published!"; + + try { + publish( parameters.get( "topic" ), parameters.get( "payload" ) ); + } catch ( IllegalArgumentException e ) { + throw new RuntimeException( e ); + } + return end; + } ).withParameters( "topic", "payload" ); + im.registerInformation( msgButton ); + + } + + + public void update() { + + topicsTable.reset(); + if ( topicsMap.isEmpty() ) { + topicsTable.addRow( "No topic subscriptions" ); + } else { + for ( Entry t : topicsMap.entrySet() ) { + topicsTable.addRow( t.getKey(), t.getValue() ); + } + } + + messageTable.reset(); + if ( messageQueue.isEmpty() ) { + messageTable.addRow( "No messages received yet." ); + } else { + for ( String[] message : messageQueue ) { + messageTable.addRow( List.of( message ) ); + } + } + + brokerKv.putPair( "Broker address", client.getConfig().getServerHost() ); + brokerKv.putPair( "Broker port", client.getConfig().getServerPort() + "" ); + brokerKv.putPair( "Broker version of MQTT", client.getConfig().getMqttVersion() + "" ); + brokerKv.putPair( "Client state", client.getState() + "" ); + brokerKv.putPair( "Client identifier", client.getConfig().getClientIdentifier().get() + "" ); + } + + + public void remove() { + InformationManager im = InformationManager.getInstance(); + im.removeInformation( topicsTable ); + im.removeInformation( brokerKv ); + im.removeInformation( messageTable ); + im.removeInformation( msgButton ); + + im.removeGroup( informationGroupTopics ); + im.removeGroup( informationGroupInfo ); + im.removeGroup( informationGroupPub ); + im.removeGroup( informationGroupReceivedMessages ); + im.removePage( informationPage ); + } + + } + + } + +} + diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java new file mode 100644 index 0000000000..131846b6a8 --- /dev/null +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/MqttStreamProcessor.java @@ -0,0 +1,91 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonString; +import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.logical.document.LogicalDocumentValues; +import org.polypheny.db.languages.QueryLanguage; +import org.polypheny.db.languages.mql.MqlFind; +import org.polypheny.db.languages.mql2alg.MqlToAlgConverter; +import org.polypheny.db.plan.AlgOptCluster; +import org.polypheny.db.prepare.PolyphenyDbCatalogReader; +import org.polypheny.db.processing.Processor; +import org.polypheny.db.stream.StreamProcessorImpl; +import org.polypheny.db.tools.AlgBuilder; +import org.polypheny.db.transaction.Statement; + +@Slf4j +public class MqttStreamProcessor extends StreamProcessorImpl { + + private final String filterQuery; + private final Statement statement; + + + public MqttStreamProcessor( FilteringMqttMessage filteringMqttMessage, Statement statement ) { + super( filteringMqttMessage.getMessage() ); + this.filterQuery = filteringMqttMessage.getQuery(); + this.statement = statement; + } + + + public boolean applyFilter() { + AlgRoot root = processMqlQuery(); + List> res = executeAndTransformPolyAlg( root, statement ); + return res.size() != 0; + } + + + private AlgRoot processMqlQuery() { + AlgBuilder algBuilder = AlgBuilder.create( this.statement ); + Processor mqlProcessor = statement.getTransaction().getProcessor( QueryLanguage.from( "mongo" ) ); + PolyphenyDbCatalogReader catalogReader = statement.getTransaction().getCatalogReader(); + final AlgOptCluster cluster = AlgOptCluster.createDocument( statement.getQueryProcessor().getPlanner(), algBuilder.getRexBuilder() ); + MqlToAlgConverter mqlConverter = new MqlToAlgConverter( mqlProcessor, catalogReader, cluster ); + + MqlFind find = (MqlFind) mqlProcessor.parse( String.format( "db.%s.find(%s)", "collection", this.filterQuery ) ).get( 0 ); + String msg = getStream(); + BsonDocument msgDoc; + AlgNode input; + if ( msg.contains( "{" ) && msg.contains( "}" ) ) { + // msg is in JSON format + msgDoc = BsonDocument.parse( msg ); + } else if ( msg.contains( "[" ) && msg.contains( "]" ) ) { + // msg is an array + msgDoc = BsonDocument.parse( "{\"$$ROOT\":" + msg + "}" ); + } else if ( isNumber( msg ) ) { + double value = Double.parseDouble( msg ); + msgDoc = new BsonDocument( "$$ROOT", new BsonDouble( value ) ); + } else if ( isBoolean( msg ) ) { + boolean value = Boolean.parseBoolean( msg ); + msgDoc = new BsonDocument( "$$ROOT", new BsonBoolean( value ) ); + } else { + // msg is String + msgDoc = new BsonDocument( "$$ROOT", new BsonString( msg ) ); + } + input = LogicalDocumentValues.create( cluster, ImmutableList.of( msgDoc ) ); + return mqlConverter.convert( find, input ); + } + +} \ No newline at end of file diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StoringMqttMessage.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StoringMqttMessage.java new file mode 100644 index 0000000000..8fb45c8861 --- /dev/null +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StoringMqttMessage.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import lombok.Getter; +import org.polypheny.db.catalog.Catalog.NamespaceType; + + +public class StoringMqttMessage { + + private final MqttMessage msg; + @Getter + private final String namespaceName; + @Getter + private final NamespaceType namespaceType; + @Getter + private final String uniqueNameOfInterface; + @Getter + private final long databaseId; + @Getter + private final int userId; + + // The name of the entity where the message should be stored in. + @Getter + private final String entityName; + + + public StoringMqttMessage( MqttMessage msg, String namespaceName, NamespaceType namespaceType, String uniqueNameOfInterface, long databaseId, int userId, String entityName ) { + this.msg = msg; + this.namespaceName = namespaceName; + this.namespaceType = namespaceType; + this.uniqueNameOfInterface = uniqueNameOfInterface; + this.databaseId = databaseId; + this.userId = userId; + this.entityName = entityName; + } + + + public String getMessage() { + return this.msg.getData(); + } + + + public String getTopic() { + return this.msg.getTopic(); + } + +} diff --git a/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java new file mode 100644 index 0000000000..459bc04e34 --- /dev/null +++ b/plugins/mqtt-stream/src/main/java/org/polypheny/db/mqtt/StreamCapture.java @@ -0,0 +1,167 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.bson.BsonArray; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.polypheny.db.PolyImplementation; +import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.constant.Kind; +import org.polypheny.db.catalog.Catalog.NamespaceType; +import org.polypheny.db.prepare.Context; +import org.polypheny.db.tools.AlgBuilder; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.transaction.TransactionException; +import org.polypheny.db.util.PolyphenyHomeDirManager; + +@Slf4j +public class StreamCapture { + + Transaction transaction; + StoringMqttMessage storingMqttMessage; + + + StreamCapture( final Transaction transaction ) { + this.transaction = transaction; + } + + + public void insert( StoringMqttMessage storingMqttMessage ) { + this.storingMqttMessage = storingMqttMessage; + insertMessage(); + } + + + private void insertMessage() { + if ( this.storingMqttMessage.getNamespaceType() == NamespaceType.DOCUMENT ) { + String sqlCollectionName = this.storingMqttMessage.getNamespaceName() + "." + this.storingMqttMessage.getEntityName(); + Statement statement = transaction.createStatement(); + AlgNode algNode = createDocument( statement, sqlCollectionName ); + AlgRoot root = AlgRoot.of( algNode, Kind.INSERT ); + List> res = executeAndTransformPolyAlg( root, statement, statement.getPrepareContext() ); + try { + transaction.commit(); + } catch ( TransactionException e ) { + throw new RuntimeException( e ); + } + } + } + + + private AlgNode createDocument( Statement statement, String sqlCollectionName) { + AlgBuilder builder = AlgBuilder.createDocumentBuilder( statement ); + + BsonDocument document = new BsonDocument(); + document.put( "source", new BsonString( this.storingMqttMessage.getUniqueNameOfInterface() ) ); + document.put( "topic", new BsonString( this.storingMqttMessage.getTopic() ) ); + String msg = this.storingMqttMessage.getMessage(); + BsonValue value; + if ( msg.contains( "{" ) && msg.contains( "}" ) ) { + value = BsonDocument.parse( msg ); + } else if ( msg.contains( "[" ) && msg.contains( "]" ) ) { + BsonArray bsonArray = new BsonArray(); + msg = msg.replace( "[", "" ).replace( "]", "" ); + String[] msglist = msg.split( "," ); + for ( String stringValue : msglist ) { + stringValue = stringValue.trim(); + bsonArray.add( getBsonValue( stringValue ) ); + } + value = bsonArray; + } else { + // msg is a single value + value = getBsonValue( msg ); + } + document.put( "payload", value ); + + AlgNode algNode = builder.docInsert( statement, sqlCollectionName, document ).build(); + return algNode; + } + + + /** + * turns one single value into the corresponding BsonValue + * @param value value that has to be casted as String + */ + protected BsonValue getBsonValue( String value ) { + if ( isInteger( value ) ) { + return new BsonInt32( Integer.parseInt( value ) ); + } else if ( isDouble( value ) ) { + return new BsonDouble( Double.parseDouble( value ) ); + } else if ( isBoolean( value ) ) { + return new BsonBoolean( Boolean.parseBoolean( value ) ); + } else { + return new BsonString( value ); + } + } + + + public boolean isDouble( String value ) { + try { + Double.parseDouble( value ); + } catch ( NumberFormatException e ) { + return false; + } + return true; + } + + + public boolean isInteger( String value ) { + try { + int intNumber = Integer.parseInt( value ); + double doubleNumber = Double.parseDouble( value ); + return intNumber == doubleNumber; + } catch ( NumberFormatException e ) { + return false; + } + } + + + public boolean isBoolean( String value ) { + return value.equals( "true" ) || value.equals( "false" ); + } + + + List> executeAndTransformPolyAlg( AlgRoot algRoot, Statement statement, final Context ctx ) { + + try { + PolyImplementation result = statement.getQueryProcessor().prepareQuery( algRoot, false ); + log.debug( "AlgRoot was prepared." ); + + List> rows = result.getRows( statement, -1 ); + statement.getTransaction().commit(); + return rows; + } catch ( Throwable e ) { + log.error( "Error during execution of stream capture query", e ); + try { + statement.getTransaction().rollback(); + } catch ( TransactionException transactionException ) { + log.error( "Could not rollback", e ); + } + return null; + } + } + +} diff --git a/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttClientBrokerTest.java b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttClientBrokerTest.java new file mode 100644 index 0000000000..5a08b45abc --- /dev/null +++ b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttClientBrokerTest.java @@ -0,0 +1,221 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.Catalog.Pattern; +import org.polypheny.db.catalog.entity.CatalogCollection; +import org.polypheny.db.iface.QueryInterface; +import org.polypheny.db.iface.QueryInterfaceManager; +import org.polypheny.db.mqtt.MqttStreamClientTest.Helper; +import org.polypheny.db.mqtt.MqttStreamPlugin.MqttStreamClient; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.transaction.TransactionManager; + +public class MqttClientBrokerTest { + + + static TransactionManager transactionManager; + static Transaction transaction; + static Map initialSettings = new HashMap<>(); + static Map changedSettings = new HashMap<>(); + static MqttStreamClient client; + + @BeforeClass + public static void init() { + TestHelper testHelper = TestHelper.getInstance(); + transactionManager = testHelper.getTransactionManager(); + transaction = testHelper.getTransaction(); + initialSettings.clear(); + initialSettings.put( "brokerAddress", "localhost" ); + initialSettings.put( "brokerPort", "1883" ); + initialSettings.put( "catchAllEntityName", "testCollection" ); + initialSettings.put( "catchAllEntity", "true" ); + initialSettings.put( "namespace", "testNamespace" ); + initialSettings.put( "namespaceType", "DOCUMENT" ); + initialSettings.put( "topics", "button" ); + initialSettings.put( "Tsl/SslConnection", "false" ); + initialSettings.put( "filterQuery", "" ); + + QueryInterface iface = QueryInterfaceManager.getInstance().getQueryInterface( "mqtt" ); + + client = new MqttStreamClient( + transactionManager, + null, + iface.getQueryInterfaceId(), + iface.getUniqueName(), + initialSettings ); + + } + + @Before + public void resetSettings() { + + initialSettings.clear(); + initialSettings.put( "brokerAddress", "1883" ); + initialSettings.put( "brokerPort", "1883" ); + initialSettings.put( "catchAllEntityName", "testCollection" ); + initialSettings.put( "catchAllEntity", "true" ); + initialSettings.put( "namespace", "testNamespace" ); + initialSettings.put( "namespaceType", "DOCUMENT" ); + initialSettings.put( "topics", "" ); + initialSettings.put( "Tsl/SslConnection", "false" ); + initialSettings.put( "filterQuery", "" ); + + QueryInterface iface = QueryInterfaceManager.getInstance().getQueryInterface( "mqtt" ); + + client = new MqttStreamClient( + transactionManager, + null, + iface.getQueryInterfaceId(), + iface.getUniqueName(), + initialSettings ); + + changedSettings.clear(); + changedSettings.put( "catchAllEntityName", "testCollection" ); + changedSettings.put( "catchAllEntity", "true" ); + changedSettings.put( "namespace", "testNamespace" ); + changedSettings.put( "topics", "" ); + changedSettings.put( "filterQuery", "" ); + } + + + private void simulateIoTDevices() { + client.publish( "device1/online", "true" ); + client.publish( "device1/sensor/measurements", "[28,30,35 ]" ); + client.publish( "device1/sensor/measurements/unit", "C" ); + client.publish( "device1/sensor/battery", "86" ); + + client.publish( "device2/online", "true" ); + client.publish( "device2/location/info", "Basel" ); + client.publish( "device2/sensor/info", "{\"wifi\":\"networkName\", \"mqtt\":{\"brokerIp\":\"127.0.0.1\", \"port\":1883}, \"deviceName\":\"device2\"}" ); + } + + + @Test + public void subscribeUnsubscribeTest() { + changedSettings.replace( "topics", "device1/sensor/battery" ); + //All subscribed topics so far are unsubscribed + client.updateSettings( changedSettings ); + assertEquals( 1, client.getTopicsMap().size() ); + assertEquals( 0, client.getTopicsMap().get( "device1/sensor/battery" ).intValue() ); + simulateIoTDevices(); + assertEquals( 1, client.getTopicsMap().get( "device1/sensor/battery" ).intValue() ); + assertTrue( client.getMessageQueue().contains( new String[]{ "device1/sensor/battery", "86" } ) ); + } + + + @Test + public void topicMapUpdatedCorrectlyWithWildcardHashtagTest() { + changedSettings.replace( "topics", "#" ); + client.updateSettings( changedSettings ); + simulateIoTDevices(); + assertEquals( 7, client.getTopicsMap().get( "#" ).intValue() ); + } + + + @Test + public void topicMapUpdatedCorrectlyWildcardHashtagAtEndTest() { + changedSettings.replace( "topics", "device1/#" ); + client.updateSettings( changedSettings ); + simulateIoTDevices(); + assertEquals( 4, client.getTopicsMap().get( "device1/#" ).intValue() ); + } + + + @Test + public void topicMapUpdatedCorrectlyWithWildcardPlusAtEndTest() { + changedSettings.replace( "topics", "device1/sensor/+" ); + client.updateSettings( changedSettings ); + simulateIoTDevices(); + assertEquals( 3, client.getTopicsMap().get( "device1/sensor/+" ).intValue() ); + } + + + @Test + public void topicMapUpdatedCorrectlyWithWildcardPlusInMiddleTest() { + changedSettings.replace( "topics", "device2/+/info" ); + client.updateSettings( changedSettings ); + simulateIoTDevices(); + assertEquals( 2, client.getTopicsMap().get( "device2/+/info" ).intValue() ); + } + + + @Test + public void topicMapUpdatedCorrectlyWithWildcardPlusAtBeginningTest() { + changedSettings.replace( "topics", "+/online" ); + client.updateSettings( changedSettings ); + simulateIoTDevices(); + assertEquals( 2, client.getTopicsMap().get( "+/online" ).intValue() ); + } + + + @Test + public void reloadSettingsCatchAllEntityToFalseTest() { + + changedSettings.replace( "topics", "device1/online, device2/+/info, device1/sensor/#" ); + client.updateSettings( changedSettings ); + changedSettings.replace( "catchAllEntity", "false" ); + client.updateSettings( changedSettings ); + assertFalse( client.getCatchAllEntity().get() ); + + Catalog catalog1 = Catalog.getInstance(); + Pattern pattern1 = new Pattern( "device1_online" ); + List collectionList1 = catalog1.getCollections( MqttStreamClientTest.Helper.getExistingNamespaceId( client.getNamespaceName(), client.getNamespaceType() ), pattern1 ); + assertFalse( collectionList1.isEmpty() ); + + Catalog catalog2 = Catalog.getInstance(); + Pattern pattern2 = new Pattern( "device2___info" ); + List collectionList2 = catalog2.getCollections( Helper.getExistingNamespaceId( client.getNamespaceName(), client.getNamespaceType() ), pattern2 ); + assertFalse( collectionList2.isEmpty() ); + + Catalog catalog3 = Catalog.getInstance(); + Pattern pattern3 = new Pattern( "device1_sensor__" ); + List collectionList3 = catalog3.getCollections( Helper.getExistingNamespaceId( client.getNamespaceName(), client.getNamespaceType() ), pattern3 ); + assertFalse( collectionList3.isEmpty() ); + } + + + @Test + public void getWildcardTopicWithHashtagTest() { + changedSettings.replace( "topics", "device1/sensor/#"); + client.updateSettings( changedSettings ); + String resultWildcardTopic1 = client.getWildcardTopic( "device1/sensor/measurements/unit" ); + assertEquals( "device1/sensor/#", resultWildcardTopic1 ); + } + + + @Test + public void getWildcardTopicWithPlusTest() { + changedSettings.replace( "topics", "+/online"); + client.updateSettings( changedSettings ); + String resultWildcardTopic = client.getWildcardTopic( "device1/online" ); + assertEquals( "+/online", resultWildcardTopic ); + } + +} diff --git a/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttStreamClientTest.java b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttStreamClientTest.java new file mode 100644 index 0000000000..e066fb3eca --- /dev/null +++ b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttStreamClientTest.java @@ -0,0 +1,323 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.Catalog.NamespaceType; +import org.polypheny.db.catalog.Catalog.Pattern; +import org.polypheny.db.catalog.entity.CatalogCollection; +import org.polypheny.db.catalog.entity.CatalogSchema; +import org.polypheny.db.catalog.exceptions.UnknownSchemaException; +import org.polypheny.db.iface.QueryInterface; +import org.polypheny.db.iface.QueryInterfaceManager; +import org.polypheny.db.mqtt.MqttStreamPlugin.MqttStreamClient; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.transaction.TransactionManager; + +public class MqttStreamClientTest { + + static TransactionManager transactionManager; + static Transaction transaction; + static Map initialSettings = new HashMap<>(); + static Map changedSettings = new HashMap<>(); + + MqttStreamClient client; + + + @BeforeClass + public static void init() { + TestHelper testHelper = TestHelper.getInstance(); + transactionManager = testHelper.getTransactionManager(); + transaction = testHelper.getTransaction(); + + } + + + @Before + public void resetSettings() { + initialSettings.clear(); + initialSettings.put( "brokerAddress", "localhost" ); + initialSettings.put( "brokerPort", "1883" ); + initialSettings.put( "catchAllEntityName", "testCollection" ); + initialSettings.put( "catchAllEntity", "true" ); + initialSettings.put( "namespace", "testNamespace" ); + initialSettings.put( "namespaceType", "DOCUMENT" ); + initialSettings.put( "topics", "" ); + initialSettings.put( "Tsl/SslConnection", "false" ); + initialSettings.put( "filterQuery", "" ); + + QueryInterface iface = QueryInterfaceManager.getInstance().getQueryInterface( "mqtt" ); + + client = new MqttStreamClient( + transactionManager, + null, + iface.getQueryInterfaceId(), + iface.getUniqueName(), + initialSettings ); + + changedSettings.clear(); + changedSettings.put( "catchAllEntityName", "testCollection" ); + changedSettings.put( "catchAllEntity", "true" ); + changedSettings.put( "namespace", "testNamespace" ); + //changedSettings.put( "namespaceType", "DOCUMENT"); + changedSettings.put( "topics", "" ); + changedSettings.put( "filterQuery", "" ); + } + + + @Test + public void saveQueryEmptyStringTest() { + changedSettings.replace( "filterQuery", " " ); + client.updateSettings( changedSettings ); + Map expected = new HashMap<>( 0 ); + assertEquals( expected, client.getFilterMap() ); + } + + + @Test + public void saveSimpleQueryTest() { + changedSettings.replace( "filterQuery", "topic1:{\"key1\":\"value1\"}" ); + client.updateSettings( changedSettings ); + Map expected = new HashMap<>( 1 ); + expected.put( "topic1", "{\"key1\":\"value1\"}" ); + assertEquals( expected, client.getFilterMap() ); + } + + + @Test + public void saveQueryToExistingTopicTest() { + changedSettings.replace( "filterQuery", "topic1:{\"key1\":\"value2\"}" ); + client.updateSettings( changedSettings ); + Map expected = new HashMap<>( 1 ); + expected.put( "topic1", "{\"key1\":\"value2\"}" ); + assertEquals( expected, client.getFilterMap() ); + } + + + @Test + public void saveQueryWithArrayTest() { + changedSettings.replace( "filterQuery", "topic1:{\"key1\":[1, 2, 3]}" ); + client.updateSettings( changedSettings ); + Map expected = new HashMap<>( 1 ); + expected.put( "topic1", "{\"key1\":[1, 2, 3]}" ); + assertEquals( expected, client.getFilterMap() ); + } + + + @Test + public void saveTwoSimpleQueryTest() { + changedSettings.replace( "filterQuery", "topic1:{\"key1\":\"value1\"}, topic2:{\"key2\":\"value2\"}" ); + client.updateSettings( changedSettings ); + Map expected = new HashMap<>( 2 ); + expected.put( "topic1", "{\"key1\":\"value1\"}" ); + expected.put( "topic2", "{\"key2\":\"value2\"}" ); + assertEquals( expected, client.getFilterMap() ); + } + + + @Test + public void saveNestedQueryTest() { + changedSettings.replace( "filterQuery", "topic1:{\"key1\":{$lt:3}}, topic2:{$or:[\"key2\":{$lt:3}, \"key2\":{$gt:5}]}" ); + client.updateSettings( changedSettings ); + Map expected = new HashMap<>( 2 ); + expected.put( "topic1", "{\"key1\":{$lt:3}}" ); + expected.put( "topic2", "{$or:[\"key2\":{$lt:3}, \"key2\":{$gt:5}]}" ); + assertEquals( expected, client.getFilterMap() ); + } + + + + + + @Test + public void toListEmptyTest() { + List result = client.toList( "" ); + List expected = new ArrayList<>(); + assertEquals( expected, result ); + } + + + @Test + public void toListSpaceTest() { + List result = client.toList( " " ); + List expected = new ArrayList<>(); + assertEquals( expected, result ); + } + + + @Test + public void toListWithContentTest() { + List result = client.toList( "1, 2 " ); + List expected = new ArrayList<>(); + expected.add( "1" ); + expected.add( "2" ); + assertEquals( expected, result ); + } + + + @Test + public void addFirstMessageToQueueTest() { + ConcurrentLinkedQueue msgQueueBefore = client.getMessageQueue(); + assertEquals( 0, msgQueueBefore.size() ); + client.addMessageToQueue( "topic1", "payload1" ); + ConcurrentLinkedQueue msgQueueAfter = client.getMessageQueue(); + assertEquals( 1, msgQueueAfter.size() ); + String[] expected = { "topic1", "payload1" }; + assertEquals( expected, msgQueueAfter.poll() ); + } + + + @Test + public void addTwentyOneMessagesToQueueTest() { + for ( int i = 0; i < 22; i++ ) { + client.addMessageToQueue( "topic1", String.valueOf( i ) ); + } + ConcurrentLinkedQueue msgQueueAfter = client.getMessageQueue(); + assertEquals( 20, msgQueueAfter.size() ); + String[] expected = { "topic1", String.valueOf( 2 ) }; + assertEquals( expected, msgQueueAfter.poll() ); + } + + + @Test + public void reloadSettingsNewNamespaceTest() { + // change to new namespace: + changedSettings.replace( "namespace", "namespace2" ); + client.updateSettings( changedSettings ); + assertEquals( "namespace2", client.getNamespaceName() ); + } + + + @Test + public void reloadSettingsExistingNamespaceTest() { + // change to existing namespace: + changedSettings.replace( "namespace", "testNamespace" ); + client.updateSettings( changedSettings ); + assertEquals( "testNamespace", client.getNamespaceName() ); + } + + + @Test(expected = RuntimeException.class) + public void reloadSettingsWrongTypeNamespaceTest() { + // change to existing namespace other type: + changedSettings.replace( "namespace", "public" ); + client.updateSettings( changedSettings ); + assertEquals( "testNamespace", client.getNamespaceName() ); + } + + + @Test + public void reloadSettingsCatchAllEntityToTrueTest() { + changedSettings.replace( "catchAllEntity", "true" ); + client.updateSettings( changedSettings ); + assertTrue( client.getCatchAllEntity().get() ); + + Catalog catalog = Catalog.getInstance(); + Pattern pattern = new Pattern( client.getCatchAllEntityName() ); + List collectionList = catalog.getCollections( Helper.getExistingNamespaceId( client.getNamespaceName(), client.getNamespaceType() ), pattern ); + assertFalse( collectionList.isEmpty() ); + } + + + @Test + public void reloadSettingsNewCatchAllEntityNameTest() { + changedSettings.replace( "catchAllEntityName", "buttonCollection" ); + client.updateSettings( changedSettings ); + assertEquals( "buttonCollection", client.getCatchAllEntityName() ); + assertTrue( client.getCatchAllEntity().get() ); + + Catalog catalog = Catalog.getInstance(); + Pattern pattern = new Pattern( client.getCatchAllEntityName() ); + List collectionList = catalog.getCollections( Helper.getExistingNamespaceId( client.getNamespaceName(), client.getNamespaceType() ), pattern ); + assertFalse( collectionList.isEmpty() ); + } + + + @Test + public void reloadSettingsExistingCatchAllEntityNameTest() { + changedSettings.replace( "catchAllEntityName", "testCollection" ); + client.updateSettings( changedSettings ); + assertEquals( "testCollection", client.getCatchAllEntityName() ); + assertTrue( client.getCatchAllEntity().get() ); + } + + + @Test + public void reloadSettingsCatchAllEntityAndCatchAllEntityNameTest() { + // testing special case: catchAllEntity changed from false to true + catchAllEntityName changes + changedSettings.replace( "catchAllEntity", "false" ); + client.updateSettings( changedSettings ); + + changedSettings.replace( "catchAllEntityName", "buttonCollection" ); + changedSettings.replace( "catchAllEntity", "true" ); + client.updateSettings( changedSettings ); + assertEquals( "buttonCollection", client.getCatchAllEntityName() ); + assertTrue( client.getCatchAllEntity().get() ); + + Catalog catalog = Catalog.getInstance(); + Pattern pattern = new Pattern( "buttonCollection" ); + List collectionList = catalog.getCollections( Helper.getExistingNamespaceId( client.getNamespaceName(), client.getNamespaceType() ), pattern ); + assertFalse( collectionList.isEmpty() ); + } + + + @Test(expected = NullPointerException.class) + public void reloadSettingsCatchAllEntityAndCatchAllEntityNameTest2() { + // testing special case: catchAllEntity changed from false to true + catchAllEntityName changes + changedSettings.replace( "catchAllEntity", "false" ); + client.updateSettings( changedSettings ); + + changedSettings.replace( "catchAllEntityName", " " ); + changedSettings.replace( "catchAllEntity", "true" ); + client.updateSettings( changedSettings ); + assertEquals( "testCollection", client.getCatchAllEntityName() ); + assertTrue( client.getCatchAllEntity().get() ); + } + + static class Helper { + static long getExistingNamespaceId( String namespaceName, NamespaceType namespaceType ) { + Catalog catalog = Catalog.getInstance(); + CatalogSchema schema; + try { + schema = catalog.getSchema( Catalog.defaultDatabaseId, namespaceName ); + } catch ( UnknownSchemaException e ) { + throw new RuntimeException( e ); + } + assert schema != null; + if ( schema.namespaceType == namespaceType ) { + return schema.id; + } else { + throw new RuntimeException( "There is already a namespace existing in this database with the given name but of another type. Please change the namespace name or the type." ); + } + } + } + + +} diff --git a/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttStreamProcessorTest.java b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttStreamProcessorTest.java new file mode 100644 index 0000000000..5de06de51d --- /dev/null +++ b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/MqttStreamProcessorTest.java @@ -0,0 +1,209 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; + +public class MqttStreamProcessorTest { + + @BeforeClass + public static void init() { + TestHelper testHelper = TestHelper.getInstance(); + + } + + + @Test + public void filterTestForSingleNumberMessage() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":10}"; + MqttMessage mqttMessage = new MqttMessage( "10", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + + } + + + @Test + public void filterTestForSingleNumberMessage2() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":10}"; + MqttMessage mqttMessage = new MqttMessage( "15", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertFalse( streamProcessor.applyFilter() ); + + } + + + @Test + public void filterTestForSingleStringMessage1() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":\"shouldMatch\"}"; + MqttMessage mqttMessage = new MqttMessage( "shouldMatch", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + + } + + + @Test + public void filterTestForSingleStringMessage2() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":\"shouldNot\"}"; + MqttMessage mqttMessage = new MqttMessage( "shouldNotMatch", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor2 = new MqttStreamProcessor( filteringMqttMessage, st ); + assertFalse( streamProcessor2.applyFilter() ); + } + + + @Test + public void filterTestForArrayMessage() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":10}"; + MqttMessage mqttMessage = new MqttMessage( "[10]", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForArrayMessage2() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":[10]}"; + MqttMessage mqttMessage = new MqttMessage( "[10]", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForArrayMessageFalse() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":10}"; + MqttMessage mqttMessage = new MqttMessage( "[15, 14]", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertFalse( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForBooleanMessageTrue() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":true}"; + MqttMessage mqttMessage = new MqttMessage( "true", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForBooleanMessageFalse() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"$$ROOT\":true}"; + MqttMessage mqttMessage = new MqttMessage( "false", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertFalse( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForJsonNumberMessage() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"count\":10}"; + MqttMessage mqttMessage = new MqttMessage( "{\"count\":10}", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForJsonArrayMessage() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"array\":10}"; + MqttMessage mqttMessage = new MqttMessage( "{\"array\":[10]}", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForJsonStringMessage() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"content\":\"online\"}"; + MqttMessage mqttMessage = new MqttMessage( "{\"content\":\"online\"}", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + @Test + public void filterTestForJsonStringMessage2() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"content\":\"online\"}"; + MqttMessage mqttMessage = new MqttMessage( "{\"content\":\"online\"}", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + + //TODO: remove this test: + @Test + public void nestedDoctest() { + Transaction transaction = TestHelper.getInstance().getTransaction(); + Statement st = transaction.createStatement(); + String filterQuery = "{\"mqtt.status\":\"online\"}"; + MqttMessage mqttMessage = new MqttMessage( "{\"mqtt\":{\"status\":\"online\"}}", "button/battery" ); + FilteringMqttMessage filteringMqttMessage = new FilteringMqttMessage( mqttMessage, filterQuery ); + MqttStreamProcessor streamProcessor = new MqttStreamProcessor( filteringMqttMessage, st ); + assertTrue( streamProcessor.applyFilter() ); + } + + +} diff --git a/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/StreamCaptureTest.java b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/StreamCaptureTest.java new file mode 100644 index 0000000000..650b3f8216 --- /dev/null +++ b/plugins/mqtt-stream/src/test/java/org/polypheny/db/mqtt/StreamCaptureTest.java @@ -0,0 +1,238 @@ +/* + * Copyright 2019-2023 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.junit.BeforeClass; +import org.junit.Test; +import org.polypheny.db.TestHelper; +import org.polypheny.db.adapter.DataStore; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.catalog.Catalog; +import org.polypheny.db.catalog.Catalog.NamespaceType; +import org.polypheny.db.catalog.Catalog.PlacementType; +import org.polypheny.db.catalog.exceptions.NoTablePrimaryKeyException; +import org.polypheny.db.ddl.DdlManager; +import org.polypheny.db.languages.QueryLanguage; +import org.polypheny.db.languages.QueryParameters; +import org.polypheny.db.languages.mql.MqlFind; +import org.polypheny.db.languages.mql.MqlQueryParameters; +import org.polypheny.db.languages.mql2alg.MqlToAlgConverter; +import org.polypheny.db.plan.AlgOptCluster; +import org.polypheny.db.prepare.PolyphenyDbCatalogReader; +import org.polypheny.db.processing.Processor; +import org.polypheny.db.tools.AlgBuilder; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.transaction.TransactionException; + +public class StreamCaptureTest { + + static StreamCapture capture; + static Transaction transaction; + static long namespaceId; + + + @BeforeClass + public static void init() { + TestHelper testHelper = TestHelper.getInstance(); + transaction = testHelper.getTransaction(); + capture = new StreamCapture( transaction ); + namespaceId = Helper.createNamespace(); + Helper.createCollection(); + } + + + @Test + public void insertIntTest() { + MqttMessage msg = new MqttMessage( "25", "testTopic" ); + StoringMqttMessage storingmsg = new StoringMqttMessage( msg, "testspace", NamespaceType.DOCUMENT, "streamCaptureTest", Catalog.defaultDatabaseId, Catalog.defaultUserId, "testCollection" ); + //StreamCapture capture = new StreamCapture( transaction ); + capture.insert( storingmsg ); + // getting content with index 0 because there will only be one document matching to this query + BsonDocument result = Helper.filter( "{\"payload\":25}" ).get( 0 ); + + assertEquals( "testTopic", result.get( "topic" ).asString().getValue() ); + assertEquals( 25, result.get( "payload" ).asInt32().intValue() ); + assertEquals( "streamCaptureTest", result.get( "source" ).asString().getValue() ); + } + + + @Test + public void insertDoubleTest() { + MqttMessage msg = new MqttMessage( "25.54", "testTopic" ); + StoringMqttMessage storingmsg = new StoringMqttMessage( msg, "testspace", NamespaceType.DOCUMENT, "streamCaptureTest", Catalog.defaultDatabaseId, Catalog.defaultUserId, "testCollection" ); + //StreamCapture capture = new StreamCapture( transaction ); + capture.insert( storingmsg ); + BsonDocument result = Helper.filter( "{\"payload\":25.54}" ).get( 0 ); + + assertEquals( "testTopic", result.get( "topic" ).asString().getValue() ); + assertEquals( 25.54, result.get( "payload" ).asDouble().getValue(), 0.1 ); + assertEquals( "streamCaptureTest", result.get( "source" ).asString().getValue() ); + } + + + @Test + public void insertStringTest() { + MqttMessage msg = new MqttMessage( "String", "testTopic" ); + StoringMqttMessage storingmsg = new StoringMqttMessage( msg, "testspace", NamespaceType.DOCUMENT, "streamCaptureTest", Catalog.defaultDatabaseId, Catalog.defaultUserId, "testCollection" ); + //StreamCapture capture = new StreamCapture( transaction ); + capture.insert( storingmsg ); + BsonDocument result = Helper.filter( "{\"payload\":\"String\"}" ).get( 0 ); + assertEquals( "testTopic", result.get( "topic" ).asString().getValue() ); + assertEquals( "String", result.get( "payload" ).asString().getValue() ); + assertEquals( "streamCaptureTest", result.get( "source" ).asString().getValue() ); + } + + + @Test + public void insertBooleanTest() { + MqttMessage msg = new MqttMessage( "true", "testTopic" ); + StoringMqttMessage storingmsg = new StoringMqttMessage( msg, "testspace", NamespaceType.DOCUMENT, "streamCaptureTest", Catalog.defaultDatabaseId, Catalog.defaultUserId, "testCollection" ); + //StreamCapture capture = new StreamCapture( transaction ); + capture.insert( storingmsg ); + BsonDocument result = Helper.filter( "{\"payload\":true}" ).get( 0 ); + + assertEquals( "testTopic", result.get( "topic" ).asString().getValue() ); + assertTrue( result.get( "payload" ).asBoolean().getValue() ); + assertEquals( "streamCaptureTest", result.get( "source" ).asString().getValue() ); + } + + + @Test + public void insertJsonTest() { + MqttMessage msg = new MqttMessage( "{\"key1\":\"value1\", \"key2\":true, \"key3\":3}", "testTopic" ); + StoringMqttMessage storingmsg = new StoringMqttMessage( msg, "testspace", NamespaceType.DOCUMENT, "streamCaptureTest", Catalog.defaultDatabaseId, Catalog.defaultUserId, "testCollection" ); + //StreamCapture capture = new StreamCapture( transaction ); + capture.insert( storingmsg ); + BsonDocument result = Helper.filter( "{\"payload.key1\":\"value1\"}" ).get( 0 ); + assertEquals( "testTopic", result.get( "topic" ).asString().getValue() ); + assertEquals( "streamCaptureTest", result.get( "source" ).asString().getValue() ); + assertEquals( "value1", result.get( "payload" ).asDocument().get( "key1" ).asString().getValue() ); + assertTrue( result.get( "payload" ).asDocument().get( "key2" ).asBoolean().getValue() ); + assertEquals( 3, result.get( "payload" ).asDocument().get( "key3" ).asInt32().getValue() ); + + } + + + @Test + public void insertArrayTest() { + MqttMessage msg = new MqttMessage( "[1, 2, 3]", "testTopic" ); + StoringMqttMessage storingmsg = new StoringMqttMessage( msg, "testspace", NamespaceType.DOCUMENT, "streamCaptureTest", Catalog.defaultDatabaseId, Catalog.defaultUserId, "testCollection" ); + //StreamCapture capture = new StreamCapture( transaction ); + capture.insert( storingmsg ); + BsonDocument result = Helper.filter( "{\"payload\":[1, 2, 3]}" ).get( 0 ); + assertEquals( "testTopic", result.get( "topic" ).asString().getValue() ); + BsonArray expectedPayload = new BsonArray(); + expectedPayload.add( 0, new BsonInt32( 1 ) ); + expectedPayload.add( 1, new BsonInt32( 2 ) ); + expectedPayload.add( 2, new BsonInt32( 3 ) ); + assertEquals( expectedPayload, result.get( "payload" ).asArray() ); + assertEquals( "streamCaptureTest", result.get( "source" ).asString().getValue() ); + } + + + @Test + public void isIntTest() { + //StreamCapture capture = new StreamCapture( transaction ); + assertTrue( capture.isInteger( "1" ) ); + } + + + @Test + public void isDoubleTest() { + assertTrue( capture.isDouble( "1.0" ) ); + } + + + @Test + public void isBooleanTest() { + assertTrue( capture.isBoolean( "false" ) ); + } + + + private static class Helper { + + private static long createNamespace() { + Catalog catalog = Catalog.getInstance(); + long id = catalog.addNamespace( "testspace", Catalog.defaultDatabaseId, Catalog.defaultUserId, NamespaceType.DOCUMENT ); + try { + catalog.commit(); + return id; + } catch ( NoTablePrimaryKeyException e ) { + throw new RuntimeException( e ); + } + } + + + private static void createCollection() { + Statement statement = transaction.createStatement(); + try { + List dataStores = new ArrayList<>(); + DdlManager.getInstance().createCollection( + namespaceId, + "testCollection", + true, + dataStores.size() == 0 ? null : dataStores, + PlacementType.MANUAL, + statement ); + transaction.commit(); + } catch ( Exception | TransactionException e ) { + throw new RuntimeException( "Error while creating a new collection:", e ); + } + } + + + private static List filter( String query ) { + Statement statement = transaction.createStatement(); + QueryParameters parameters = new MqlQueryParameters( String.format( "db.%s.find(%s)", "testCollection", query ), "testSpace", NamespaceType.DOCUMENT ); + AlgBuilder algBuilder = AlgBuilder.create( statement ); + Processor mqlProcessor = transaction.getProcessor( QueryLanguage.from( "mongo" ) ); + PolyphenyDbCatalogReader catalogReader = transaction.getCatalogReader(); + final AlgOptCluster cluster = AlgOptCluster.createDocument( statement.getQueryProcessor().getPlanner(), algBuilder.getRexBuilder() ); + MqlToAlgConverter mqlConverter = new MqlToAlgConverter( mqlProcessor, catalogReader, cluster ); + + MqlFind find = (MqlFind) mqlProcessor.parse( String.format( "db.%s.find(%s)", "testCollection", query ) ).get( 0 ); + + AlgRoot root = mqlConverter.convert( find, parameters ); + + List> res = capture.executeAndTransformPolyAlg( root, statement, statement.getPrepareContext() ); + List result = new ArrayList<>(); + for ( List objectsList : res ) { + result.add( objectsList.get( 0 ).toString() ); + } + + List listOfMessage = new ArrayList<>(); + for ( String documentString : result ) { + BsonDocument doc = BsonDocument.parse( documentString ); + listOfMessage.add( doc ); + } + return listOfMessage; + } + + + } + + +} diff --git a/settings.gradle b/settings.gradle index e86ba4affb..93081340d2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,6 +30,7 @@ include 'plugins:jdbc-adapter-framework' include 'plugins:avatica-interface' include 'plugins:rest-interface' include 'plugins:http-interface' +include 'plugins:mqtt-stream' // adapters plugins include 'plugins:hsqldb-adapter' diff --git a/webui/src/main/java/org/polypheny/db/webui/crud/LanguageCrud.java b/webui/src/main/java/org/polypheny/db/webui/crud/LanguageCrud.java index fde6f84be0..2f210312eb 100644 --- a/webui/src/main/java/org/polypheny/db/webui/crud/LanguageCrud.java +++ b/webui/src/main/java/org/polypheny/db/webui/crud/LanguageCrud.java @@ -162,7 +162,7 @@ public static void printLog( Throwable t, QueryRequest request ) { public static void attachError( Transaction transaction, List results, String query, Throwable t ) { - //String msg = t.getMessage() == null ? "" : t.getMessage(); + //String msg = t.getData() == null ? "" : t.getData(); Result result = new Result( t ).setGeneratedQuery( query ).setXid( transaction.getXid().toString() ); if ( transaction.isActive() ) {