Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit bfdbd34

Browse files
authored
Add Arion gateway (#748)
1 parent a1dbde6 commit bfdbd34

File tree

17 files changed

+564
-27
lines changed

17 files changed

+564
-27
lines changed

kubernetes/services/dpm_manager.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ data:
6161
host.ip.to.group.topic.map=group-topic1:192.168.131.131,10.10.10.11 group-topic2:192.168.131.131,11.11.11.12
6262
group.topic.to.multicast.topic.map=multicast-topic1:group-topic1,group-topic3 multicast-topic2:group-topic2,group-topic4
6363
64+
65+
arionGateway.enabled = false
66+
arionMaster.server = 127.0.0.1
67+
arionMaster.port = 9090
68+
6469
zetaGateway.enabled=false
6570
zetaGateway.node.mac=e0:97:96:02:45:53
6671
microservices.node.service.url=http://nodemanager-service.default.svc.cluster.local:9007/nodes

lib/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,22 @@ Copyright(c) 2020 Futurewei Cloud
8181
<dependency>
8282
<groupId>io.grpc</groupId>
8383
<artifactId>grpc-netty-shaded</artifactId>
84-
<version>1.23.0</version>
84+
<version>1.42.2</version>
8585
</dependency>
8686
<dependency>
8787
<groupId>io.grpc</groupId>
8888
<artifactId>grpc-protobuf</artifactId>
89-
<version>1.23.0</version>
89+
<version>1.42.2</version>
9090
</dependency>
9191
<dependency>
9292
<groupId>io.grpc</groupId>
9393
<artifactId>grpc-stub</artifactId>
94-
<version>1.23.0</version>
94+
<version>1.42.2</version>
9595
</dependency>
9696
<dependency>
9797
<groupId>io.grpc</groupId>
9898
<artifactId>protoc-gen-grpc-java</artifactId>
99-
<version>1.23.0</version>
99+
<version>1.42.2</version>
100100
<type>pom</type>
101101
</dependency>
102102

schema/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,22 @@
3636
<dependency>
3737
<groupId>io.grpc</groupId>
3838
<artifactId>grpc-netty-shaded</artifactId>
39-
<version>1.23.0</version>
39+
<version>1.42.2</version>
4040
</dependency>
4141
<dependency>
4242
<groupId>io.grpc</groupId>
4343
<artifactId>grpc-protobuf</artifactId>
44-
<version>1.23.0</version>
44+
<version>1.42.2</version>
4545
</dependency>
4646
<dependency>
4747
<groupId>io.grpc</groupId>
4848
<artifactId>grpc-stub</artifactId>
49-
<version>1.23.0</version>
49+
<version>1.42.2</version>
5050
</dependency>
5151
<dependency>
5252
<groupId>io.grpc</groupId>
5353
<artifactId>protoc-gen-grpc-java</artifactId>
54-
<version>1.23.0</version>
54+
<version>1.42.2</version>
5555
<type>pom</type>
5656
</dependency>
5757
<dependency>
@@ -62,7 +62,7 @@
6262
<dependency>
6363
<groupId>io.grpc</groupId>
6464
<artifactId>grpc-testing</artifactId>
65-
<version>1.23.0</version>
65+
<version>1.42.2</version>
6666
<scope>test</scope>
6767
</dependency>
6868
</dependencies>

schema/proto3/gateway.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ enum GatewayType {
2929
TGW = 2; // Transit Gateway
3030
IGW = 3; // Internet Gateway
3131
NGW = 4; // NAT Gateway
32+
ARION = 5;
3233
}
3334

3435
message GatewayConfiguration {
@@ -50,8 +51,17 @@ message GatewayConfiguration {
5051
uint32 port_inband_operation = 1;
5152
}
5253

54+
message arion {
55+
string vpc_id = 1;
56+
uint32 vni = 2;
57+
string subnet_id = 3;
58+
// port for in-band (same NIC channel) operation
59+
uint32 port_inband_operation = 4;
60+
}
61+
5362
oneof extra_info {
5463
zeta zeta_info = 6;
64+
arion arion_info = 7;
5565
}
5666
}
5767

schema/proto3/goalstateprovisioner.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ option java_package = "com.futurewei.alcor.schema";
2222

2323
import "common.proto";
2424
import "goalstate.proto";
25+
import "neighbor.proto";
2526

2627
service GoalStateProvisioner {
2728

@@ -49,6 +50,8 @@ service GoalStateProvisioner {
4950
rpc RequestGoalStates (HostRequest) returns (HostRequestReply) {
5051
}
5152

53+
rpc PushGoalstates (NeighborRulesRequest) returns (GoalStateOperationReply) {
54+
}
5255
}
5356

5457
message HostRequest {
@@ -83,6 +86,13 @@ message HostRequestReply {
8386
uint32 total_operation_time = 3;
8487
}
8588

89+
message NeighborRulesRequest {
90+
uint32 format_version = 1;
91+
string request_id = 2;
92+
93+
repeated NeighborState neigborstates = 3;
94+
}
95+
8696
message GoalStateOperationReply {
8797
uint32 format_version = 1;
8898

schema/proto3/neighbor.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ message NeighborConfiguration {
4343
NeighborType neighbor_type = 1;
4444
string subnet_id = 2;
4545
string ip_address = 3;
46+
string arion_group = 4;
47+
uint32 tunnel_id = 5;
48+
string mac_address = 6;
4649
}
4750

4851
message AllowAddressPair {

services/data_plane_manager/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ Copyright(c) 2020 Futurewei Cloud
8686
<artifactId>junit-jupiter</artifactId>
8787
<scope>test</scope>
8888
</dependency>
89-
89+
<dependency>
90+
<groupId>com.github.ishugaliy</groupId>
91+
<artifactId>allgood-consistent-hash</artifactId>
92+
<version>1.0.0</version>
93+
</dependency>
9094
<dependency>
9195
<groupId>org.springframework.boot</groupId>
9296
<artifactId>spring-boot-starter-test</artifactId>
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
MIT License
3+
Copyright(c) 2020 Futurewei Cloud
4+
5+
Permission is hereby granted,
6+
free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction,
7+
including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons
8+
to whom the Software is furnished to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
11+
12+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
14+
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
15+
*/
16+
package com.futurewei.alcor.dataplane.cache;
17+
18+
import com.futurewei.alcor.common.db.CacheException;
19+
import com.futurewei.alcor.common.db.CacheFactory;
20+
import com.futurewei.alcor.common.db.ICache;
21+
import com.futurewei.alcor.common.stats.DurationStatistics;
22+
import com.futurewei.alcor.dataplane.entity.ArionGroup;
23+
import com.futurewei.alcor.dataplane.entity.ArionWing;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.springframework.context.annotation.ComponentScan;
26+
import org.springframework.stereotype.Repository;
27+
28+
import java.util.Collection;
29+
import java.util.Map;
30+
import java.util.Set;
31+
32+
@Repository
33+
@ComponentScan(value="com.futurewei.alcor.common.db")
34+
public class ArionWingCache {
35+
36+
// arionWingCache store Arion wing meta data. key is Arion Wing hash code and value is Arion wing meta data.
37+
private ICache<String, ArionWing> arionWingCache;
38+
39+
// arionWingGroupCache store Arion group meta data. key is Arion wing group name, value is Arion group meta data.
40+
private ICache<String, ArionGroup> arionWingGroupCache;
41+
private CacheFactory cacheFactory;
42+
43+
@Autowired
44+
public ArionWingCache(CacheFactory cacheFactory) {
45+
this.cacheFactory = cacheFactory;
46+
arionWingCache = cacheFactory.getCache(ArionWing.class);
47+
arionWingGroupCache = cacheFactory.getCache(ArionGroup.class);
48+
}
49+
50+
@DurationStatistics
51+
public ArionWing getArionWing (String resourceId) throws CacheException {
52+
return arionWingCache.get(resourceId);
53+
}
54+
55+
@DurationStatistics
56+
public Collection<ArionWing> getArionWings () throws CacheException {
57+
return arionWingCache.getAll().values();
58+
}
59+
60+
@DurationStatistics
61+
public Map<String, ArionWing> getAllSubnetPorts(Map<String, Object[]> queryParams) throws CacheException {
62+
return arionWingCache.getAll(queryParams);
63+
}
64+
65+
@DurationStatistics
66+
public Collection<ArionWing> getArionWings (Set<String> keys) throws CacheException {
67+
return arionWingCache.getAll(keys).values();
68+
}
69+
70+
@DurationStatistics
71+
public void insertArionWing (ArionWing arionWing) throws CacheException {
72+
arionWingCache.put(String.valueOf(arionWing.hashCode()), arionWing);
73+
}
74+
75+
@DurationStatistics
76+
public void deleteArionWing (String resourceId) throws CacheException {
77+
arionWingCache.remove(resourceId);
78+
}
79+
80+
@DurationStatistics
81+
public Object getArionGroup (String resourceId) throws CacheException {
82+
return arionWingGroupCache.get(resourceId);
83+
}
84+
85+
@DurationStatistics
86+
public void insertArionGroup (String resourceId) throws CacheException {
87+
System.out.println("Insert arion group: " + resourceId);
88+
arionWingGroupCache.put(resourceId, new ArionGroup(resourceId));
89+
}
90+
91+
@DurationStatistics
92+
public void deleteArionGroup (String resourceId) throws CacheException {
93+
arionWingGroupCache.remove(resourceId);
94+
}
95+
96+
@DurationStatistics
97+
public Map<String, ArionGroup> getAllArionGroup () throws CacheException {
98+
return arionWingGroupCache.getAll();
99+
}
100+
101+
}

services/data_plane_manager/src/main/java/com/futurewei/alcor/dataplane/client/grpc/DataPlaneClientImplV2.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ public class DataPlaneClientImplV2 implements DataPlaneClient<UnicastGoalStateV2
5959
// prints out UUID and time, when sending a GoalState to any of the monitorHosts
6060
private ArrayList<String> monitorHosts;
6161

62+
@Value("${arionGateway.enabled:false}")
63+
private boolean arionGatwayEnabled;
64+
65+
@Value("${arionGateway.server:127.0.0.1}")
66+
private String arionMasterServer;
67+
68+
@Value("${arionGateway.port:9090}")
69+
private int arionMasterPort;
70+
6271
@Value("${microservices.connectTimeout:300}")
6372
private String connectTimeout;
6473

@@ -72,6 +81,9 @@ public List<String> sendGoalStates(List<UnicastGoalStateV2> unicastGoalStates) t
7281
for (UnicastGoalStateV2 unicastGoalState : unicastGoalStates) {
7382
goalStateBuilder = getGoalState(goalStateBuilder, unicastGoalState);
7483
}
84+
if (arionGatwayEnabled) {
85+
doSendGoalStateToArionMaster(goalStateBuilder);
86+
}
7587
doSendGoalState(goalStateBuilder.build(), finishLatch, results);
7688

7789
if (!finishLatch.await(Integer.parseInt(connectTimeout), TimeUnit.SECONDS)) {
@@ -149,8 +161,8 @@ public List<String> sendGoalStates(List<UnicastGoalStateV2> unicastGoalStates, M
149161
return null;
150162
}
151163

152-
private GrpcChannelStub createGrpcChannelStub(String hostIp) {
153-
ManagedChannel channel = ManagedChannelBuilder.forAddress(hostIp, this.hostAgentPort)
164+
private GrpcChannelStub createGrpcChannelStub(String hostIp, int port) {
165+
ManagedChannel channel = ManagedChannelBuilder.forAddress(hostIp, port)
154166
.usePlaintext()
155167
.keepAliveWithoutCalls(true)
156168
.keepAliveTime(Long.MAX_VALUE, TimeUnit.SECONDS)
@@ -160,30 +172,31 @@ private GrpcChannelStub createGrpcChannelStub(String hostIp) {
160172
return new GrpcChannelStub(channel, asyncStub);
161173
}
162174

163-
private GrpcChannelStub getOrCreateGrpcChannel(String hostIp) {
164-
if (!this.hostIpGrpcChannelStubMap.containsKey(hostIp)) {
165-
this.hostIpGrpcChannelStubMap.put(hostIp, createGrpcChannelStubArrayList(hostIp));
175+
private GrpcChannelStub getOrCreateGrpcChannel(String hostIp, Integer port) {
176+
if (!this.hostIpGrpcChannelStubMap.containsKey(hostIp + port)) {
177+
178+
this.hostIpGrpcChannelStubMap.put(hostIp + port, createGrpcChannelStubArrayList(hostIp, port));
166179
LOG.info("[getOrCreateGrpcChannel] Created a channel and stub to host IP: " + hostIp);
167180
}
168181
int usingChannelWithThisIndex = ThreadLocalRandom.current().nextInt(0, numberOfGrpcChannelPerHost);
169-
ManagedChannel chan = this.hostIpGrpcChannelStubMap.get(hostIp).get(usingChannelWithThisIndex).channel;
182+
ManagedChannel chan = this.hostIpGrpcChannelStubMap.get(hostIp + port).get(usingChannelWithThisIndex).channel;
170183
//checks the channel status, reconnects if the channel is IDLE
171184

172185
ConnectivityState channelState = chan.getState(true);
173186
if (channelState != ConnectivityState.READY && channelState != ConnectivityState.CONNECTING && channelState != ConnectivityState.IDLE) {
174-
GrpcChannelStub newChannelStub = createGrpcChannelStub(hostIp);
175-
this.hostIpGrpcChannelStubMap.get(hostIp).set(usingChannelWithThisIndex, newChannelStub);
187+
GrpcChannelStub newChannelStub = createGrpcChannelStub(hostIp, port);
188+
this.hostIpGrpcChannelStubMap.get(hostIp + port).set(usingChannelWithThisIndex, newChannelStub);
176189
LOG.info("[getOrCreateGrpcChannel] Replaced a channel and stub to host IP: " + hostIp);
177190
}
178191
LOG.info("[getOrCreateGrpcChannel] Using channel and stub index " + usingChannelWithThisIndex + " to host IP: " + hostIp);
179-
return this.hostIpGrpcChannelStubMap.get(hostIp).get(usingChannelWithThisIndex);
192+
return this.hostIpGrpcChannelStubMap.get(hostIp + port).get(usingChannelWithThisIndex);
180193
}
181194

182-
private ArrayList<GrpcChannelStub> createGrpcChannelStubArrayList(String hostIp) {
195+
private ArrayList<GrpcChannelStub> createGrpcChannelStubArrayList(String hostIp, int port) {
183196
long start = System.currentTimeMillis();
184197
ArrayList<GrpcChannelStub> arr = new ArrayList<>();
185198
for (int i = 0; i < numberOfGrpcChannelPerHost; i++) {
186-
GrpcChannelStub channelStub = createGrpcChannelStub(hostIp);
199+
GrpcChannelStub channelStub = createGrpcChannelStub(hostIp, port);
187200
// Using Linkerd load balance
188201
//warmUpChannelStub(channelStub, hostIp);
189202
arr.add(channelStub);
@@ -232,10 +245,34 @@ public void onCompleted() {
232245
return;
233246
}
234247

248+
private String doSendGoalStateToArionMaster (Goalstate.GoalStateV2.Builder goalStateV2) {
249+
GrpcChannelStub channelStub = getOrCreateGrpcChannel(arionMasterServer, arionMasterPort);
250+
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = channelStub.stub;
251+
var neighborStateRequestBuilder = Goalstateprovisioner.NeighborRulesRequest.newBuilder();
252+
neighborStateRequestBuilder.addAllNeigborstates(goalStateV2.getNeighborStatesMap().values());
253+
asyncStub.pushGoalstates(neighborStateRequestBuilder.build(), new StreamObserver<Goalstateprovisioner.GoalStateOperationReply>() {
254+
@Override
255+
public void onNext(Goalstateprovisioner.GoalStateOperationReply goalStateOperationReply) {
256+
LOG.info("Get response: " + goalStateOperationReply.toString());
257+
}
258+
259+
@Override
260+
public void onError(Throwable throwable) {
261+
LOG.info(throwable.getMessage());
262+
}
263+
264+
@Override
265+
public void onCompleted() {
266+
267+
}
268+
});
269+
return null;
270+
}
271+
235272
private String doSendGoalState(Goalstate.GoalStateV2 goalStateV2, CountDownLatch finishLatch, List<String> replies) {
236273
String hostIp = netwconfigmanagerGrpcServiceUrl;
237274
long start = System.currentTimeMillis();
238-
GrpcChannelStub channelStub = getOrCreateGrpcChannel(hostIp);
275+
GrpcChannelStub channelStub = getOrCreateGrpcChannel(hostIp, hostAgentPort);
239276
long chan_established = System.currentTimeMillis();
240277
LOG.info("[doSendGoalState] Established channel, elapsed Time in milli seconds: " + (chan_established - start));
241278
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = channelStub.stub;
@@ -371,10 +408,10 @@ private Goalstate.GoalStateV2.Builder getGoalState(Goalstate.GoalStateV2.Builder
371408

372409

373410
if (goalStateBuilder.containsRouterStates(unicastGoalStateV2.getHostIp() + "/" + entry.getKey())) {
374-
Router.RouterConfiguration.Builder routerConfigurationBuilder = goalStateBuilder.getRouterStatesMap().get(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()).getConfiguration().toBuilder();
375-
routerConfigurationBuilder.addAllSubnetRoutingTables(entry.getValue().getConfiguration().getSubnetRoutingTablesList());
376-
Router.RouterState.Builder routerStateBuilder = goalStateBuilder.getRouterStatesMap().get(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()).toBuilder();
377-
routerStateBuilder.setConfiguration(routerConfigurationBuilder);
411+
Router.RouterConfiguration.Builder routerConfigurationBuilder = goalStateBuilder.getRouterStatesMap().get(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()).getConfiguration().toBuilder();
412+
routerConfigurationBuilder.addAllSubnetRoutingTables(entry.getValue().getConfiguration().getSubnetRoutingTablesList());
413+
Router.RouterState.Builder routerStateBuilder = goalStateBuilder.getRouterStatesMap().get(unicastGoalStateV2.getHostIp() + "/" + entry.getKey()).toBuilder();
414+
routerStateBuilder.setConfiguration(routerConfigurationBuilder);
378415
} else {
379416
goalStateBuilder.putRouterStates(unicastGoalStateV2.getHostIp() + "/" + entry.getKey(), entry.getValue());
380417
}

0 commit comments

Comments
 (0)