-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathabout.html
More file actions
750 lines (715 loc) · 37.2 KB
/
about.html
File metadata and controls
750 lines (715 loc) · 37.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="chrome=1">
<title>Pi</title>
<link rel="stylesheet" href="stylesheets/styles.css">
<link rel="stylesheet" href="stylesheets/pygment_trac.css">
<meta name="viewport" content="width=device-width, initial-scale=1, user-scalable=no">
<!--[if lt IE 9]>
<script src="//html5shiv.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
</head>
<body>
<div class="wrapper">
<header>
<h1>Pi</h1>
<p>Distributed Decentralised App Substrate</p>
<p class="view"><a href="/">Home</a></p>
<p class="view"><a href="about.html">About</a></p>
<p class="view"><a href="code.html">Code</a></p>
</header>
<section>
<h2>
Origins
</h2>
<p>
Pi came about as a result of a (at the time) strong desire by a major global telco
to explore new, game-changing ways of delivering combined software, infrastructure and
network services in the cloud.
</p>
<p>
Clearly an ambitious and demanding challenge, it was one that would stand or fall on ability to
<ol>
<li>rapidly develop and take to market entirely new infrastructure software services and capabilities,
<li>at scale,
<li>with resilience and service levels those of leading cloud providers
<li>whilst remaining competitive in terms of total cost of ownership
</ol>
</p>
<p>
Seeking to combine these four corners of design space is of course challenging and must
involve tradeoffs. The initial focus of our development effort - providing an EC2-compatible elastic
compute platform for internal use - provided ample opportunity to define, explore and evolve
these tradeoffs. It meant providing a way to deploy a distributed, resilient control
and automation software layer across many thousands of machines.
</p>
<h2>
Approach
</h2>
<p>
Architecturally, this context quickly led us to look beyond 'mainstream' platform design approaches.
There was a clear need for fine-grained control of huge amounts of distributed hardware, and co-ordination
of its resources. As a minimum, we were going to have to deploy a software agent on each of the
machines, so decisions became more focused on whether and to what extent to utilise this distributed
substrate for co-ordination and resource management. The alternative would have been to build a set of
centralised services using established service oriented patterns, storing and gathering data centrally
and controlling the resources remotely.
</p>
<p>
We opted for the former approach, fully aware that it would likely be more challenging to implement,
impose a generous helping of paradigm shifts upon its developers and maintainers, and be likely to get
us to the edge of uncharted waters. What ultimately pushed us down this path was the realisation that
a good balance between our four goals would depend on simplicity, automation and consistency. A significant
maintenance and management burden across would risk killing the commercial proposition, now or in the
future. Manual intervention was to be eliminated. We could not afford to solve problems
of resilience and scale in multiple ways or rely on costly specialised infrastructure. We were
keenly aware of the incompatibilities between fast time to market and a multitude of large teams or
traditional techniques for deploying and managing infrastructure services.
</p>
<p>
And ultimately, our goal was to help our organisation change the game, not stick to the rules!
</p>
<p>
A number of succesfully deployed decentralised architectures, above all Skype, BitTorren and Amazon's
Dynamo, provided extremely useful reference points in design space, as well as an illustration
of what was possible not only in the data centre but also 'in the wild'. At the same time, Adobe were
committing to a very ambitious effort to create a dynamic peer-to-peer network for media streaming
across its near-ubiquitous Flash player install base.
</p>
<p>
Thus the design goals focused on creating a generic, common software substrate for the development and
deployment highly scalable, highly robust software applications and services. It had to not only
provide application platform primitives such as communications, data storage and lifecycle support,
but also a consistent and easy-to-consume set of capabilities required to achieve scale and
resilience - load balancing, failover, dynamic resource management and allocation.
</p>
<p>
This became Pi, the name alluding to the infinite number of digits in its decimal representation.
</p>
<p>
Pi is of course not alone in seeking to offer these capabilities. Rather than striving for
'magical' approach - 'just write your app, and we'll make it scale', the approach Pi takes
is to explicitly define a 'menu' of tradeoffs needed to achieve scale and resilience,
recommend patterns for implementing these tradeoffs, and provide APIs to facilitate
and guide that implementation. When scale and resilience matter, abstractions should be a
transparent means of convenience rather than opaquely hiding details that need to be
understood as the system evolves.
</p>
<p>
The first set of services developed on Pi, named Pi Cloud (no relation to PiCloud!)
provided de-facto standard infrastructure-as-a-service (IaaS) compute and storage
services API-compatible with Amazon's EC2 and S3. Much of what follows took shape
over the course of building and running this IaaS platform.
</p>
<h2>
Anatomy of Pi
</h2>
<p>
The foundations of Pi's distributed, decentralized architecture, borrow heavily from peer-to-peer
and specifically distributed hashtable (DHT) paradigms, and several successful decentralised
Internet-scale deployments, most notably pre-Microsoft Skype and BitTorrent.
</p>
<p>
Pi nodes - each with its own unique identifier - form a ring of peers (a structured p2p
overlay), with each node tracking a set of peers. A core appeal of this organisation is that
it allows one node to route a message to another via a bounded number of intermediate nodes; thus by
mapping resources (such as data or services) to identifiers, nodes can locate and access
those resources without having to have specific knowledge (addresses, routing etc) of how to access them.
</p>
<p>
The initial implementation of the structured p2p overlay / DHT was provided by by the FreePastry
library, which takes care of organising nodes into a ring, providing message routing and delivery,
and exposes events relating to messaging and node lifecycle to applications.
</p>
<p>
Above Pastry sits Pi Core, the p2p application framework that provides developer-friendly APIs for
application creation, messaging, data access and management, and much more. It also abstracts away
the specific DHT implementation provided by Pastry.
</p>
<p>
Pi Cloud services are implemented as a series of Pi applications, starting with a shared module that
defines the shared data entities, followed by applications for network, volume, image, storage and
management, and finally the API application which exposes the APIs. These applications are then packaged
together to form a deployable artifact – in the case of Pi Cloud, a RPM package - to be deployed on
each server node, and run in a single JVM instance.
</p>
<p>
Pi is Java-based, with Pi Cloud featuring a small number of platform-specific aspects,
largely around networking for machine instances. These currently work with CentOS / RedHat Linux.
</p>
<p>
The initial release of Pi Cloud was deployed on Rocks Clusters, and uses Parascale and Gluster as
the underlying distributed storage substrate.
</p>
<h2>
Developing Pi Applications
</h2>
<p>
Developers coming to Pi with a background in prevalent server-side request / transaction
processing programming models typically find they have to 'get their head around' several
core concepts that may well be familiar in a different context.
These commonly include:
</p>
<ul>
<li>
<b>Processing Paradigm</b> - It is typically helpful to think of the processing model
used by Pi as workflow-based rather than request-response based. Pi uses message passing,
however a consequence of its resilience needs is that any notion of parking messages on a
single (by definition unreliable) node for forwarding later will not do. Hence operations
that cannot be processed immediately, have a long processing time, or need to be handed
off to a remote peer for processing are represented as data, stored in the DHT at the
earliest opportunity, and their progress and completion is monitored by multiple peers
(see further below). In terms of how such operations are modelled, typical handling is
through a series of steps, each of which can, as just described, be monitored to ensure successful
completion and handoff to the next step. Request-response type interactions are of course
possible under this paradigm, and Pi provides correlation identifiers and relevant helper APIs.
</li>
<li>
<b>Asynchronous Programming Model</b> - Other than a small number of notable exceptions,
operations implemented within Pi follow the asynchronous, non-blocking paradigm that is
very similar to that used by libraries such as node.js and Event Machine, and browser
programming models used by Javascript and Actionscript. Results, events or errors are
returned via callbacks. IO and other events events are delivered to applications from a single
worker / event thread, so need to be processed rapidly, without blocking, with any processing
intensive activities needing to be explicitly done in a separate thread from a thread pool.
In Pi this is currently less than elegant due to usual clumsiness involved in implementing
anonymous methods and closures in Java; also due to a level of reliance on the developer's
ability to categorise tasks as 'fast enough' to be executed in the event thread (what if
the node itself is under load and things slow down?) or requiring a dip into the thread pool.
</li>
<li>
<b>Data Storage</b> - Pi sits atop a distributed hashtable (DHT), acting as a distributed key-value
data store. There are no locks and no transactions. Concurrency is managed using an optimistic model
- we read a record, update it, write it; if another write happened between us reading the record
and writing it, we repeat the read-update-write process. This provides massive scalability.
Tradeoffs are of course around data consistency and expressiveness of modeling data relationships.
<br/>
With Pi, a set of Pi applications (forming one or more services) will typically share a data model.
It is important to ensure that Pi data entities are robust in the presence of disparate versions
of Pi applications across nodes. Current version of Pi puts some of this burden on the developer.
It also provides support for non-strict binding of JSON to Java objects, allowing the developer to
be explicit at a fine-grained (attribute) level about how liberal this binding is, e.g. what happens
if a node encounters an unknown field. Developers can also choose to version entities and implement
'dual writes' (both 'old' and 'new' version, with 'old' eventually retired) when needing to
implement changes to structure of data entities.
</li>
<li>
<b>Concurrency</b> - Pi has no locks and no transactions. For managing concurrency around single-record
updates, optimistic concurrency is used as described above. For updates across multiple records,
each record is updated separately. Failure is handled through a combination of compensation,
watching (see the watcher pattern) and inconsistency resolving logic, as appropriate.
</li>
<li>
<b>Idempotence</b> - Operations in Pi should in general be idempotent. For instance, allocation of a
resource to a consumer should succeed if that resource has already been allocated to that consumer.
This makes applications more robust and enables recovery mechanisms based on re-running failing tasks.
It requires correct modelling (workflows, state machines etc) of operations by the system designer.
</li>
<li>
<b>Polling and Eventing</b> - Pi applications often use both polling and eventing around a particular task
as a means of achieving resilience and failure recovery. Usually eventing is used to ensure
tasks are processed eagerly, while polling is used as fallback in the event of failure.
</li>
<li>
<b>Application Coupling</b> - P2P applications that together comprise a whole system require
design choices to be made around the level of coupling between those apps. Established design principles
from SOA and client-server around loose coupling, abstraction and responsibilities need to be supplemented
with the changes that P2P introduces - an inherently deliberately distributed nature of the whole
system and its apps means that it can make sense to tightly couple applications that together belong to a
single system, in the sense of sharing knowledge of and responsibility for a resource, a piece of DHT
data or a function. Of course, good practices around the design of local versus remote interfaces and
interactions continue to apply, as does the need to ensure robustness against having disparate versions
of each application deployed on each system node.
</li>
</ul>
<h2>Patterns of Pi</h2>
<h3>1. Ring Creation and Initialisation</h3>
<h4>
Ring Initiator
<i>(How does a node start a new Pi ring?)</i>
</h4>
<p>
In Pi, system nodes are peers that form a DHT in a ring structure, enabling routing to and location of
resources. To join a ring, a Pi node goes through a bootstrapping step that starts with contacting a
node that is already in that ring. This allows it to discover its place in the ring (based on its
randomly generated 160-bit node ID), and populate its initial list of known peers.
</p>
<p>
Pi currently supports peer discovery based on a list of known hostnames or IP addresses, with
intention to also support broadcast and other mechanisms.
</p>
<p>
When started, all nodes attempt to join an existing Pi ring, by using one or more available
peer discovery mechanisms (see below). If this fails or times out, a node will by default not start
a new ring.
</p>
<p>
In order to create a new ring, a configuration flag that normally prevents nodes from starting new
rings needs to be disabled. This allows a node to start a new ring. Once a node has started a new
ring, other nodes can bootstrap off of it, inheriting its properties where applicable (for instance,
region and availability zone in the case of Pi Cloud).
</p>
<h4>
Seeder
<i>(How does a DHT get initialised with required data?)</i>
</h4>
<p>
Some applications require presence of one or more data items in the DHT in order to function.
Although it is possible to have applications create required data lazily if attempts to read them
fail, this makes it difficult to distinguish between data that becomes inaccessible due to a fault,
and runs the risk of creating multiple copies of the same entity.
</p>
<p>
Some examples of data items that may need seeding include entities used to determine whether or not
an application should be active on a certain node, records used for resource allocation, and task queue
records.
</p>
<p>
A seeder is simply a service that allows such records to be created in the DHT when a new ring has been
started. It usually performs a series of DHT store operations, using configuration data specified via the
service interface.
</p>
<p>
In addition, a seeder may be used to initialise a new subset of nodes within an existing ring (for
instance to create configuration for a specific environment or availability zone, for use by nodes
within that environment), or to alter the settings of a previously seeded record.
</p>
<h3>2. Applications</h3>
<h4>
Pi Application
<i>(How do I create a Pi application?)</i>
</h4>
<p>
Developers simply extend the appropriate Pi application base class. There is a 'vanilla' Pi
application base class, as well as its extensions if additional functionality is
required - for instance:
</p>
<ul>
<li>
pub/sub support (anycast / broadcast sending and receiving)
</li>
<li>
external access to one or more services (assigning an address from a pool of addresses to
active applications, and opening / closing access via iptables)
</li>
<li>
etc.
</li>
</ul>
<p>
Then define application activation strategy (see below) and implement handlers for all or some of the following:
</p>
<ul>
<li>application startup / shutdown</li>
<li>message delivery and forward</li>
<li>pub/sub events, such as receipt of anycast or broadcast messages</li>
<li>application activation / passivation (becomeActive / becomePassive)</li>
<li>handling of peer list updates (in effect node arrival / departure events)</li>
</ul>
<p>
The current implementation of Pi recommends making each application a Spring bean, and having Spring
inject required dependencies.
</p>
<h4>
Application Activator
<i>(How can I have an application be active on a subset of nodes in the p2p network?)</i>
</h4>
<p>
Some applications will want to run only on a subset of nodes. Examples might include
applications that expose a service on one of N IP addresses, applications that perform some
intensive processing, or those that aggregate information.
</p>
<p>
An Application Activator provides a strategy that is used to determine whether or not
an application should be active on a particular node. The meaning of 'active' and 'passive'
are entirely application specific - indeed it is not uncommon in Pi for 'passive' applications
to still perform functions like monitoring and routing messages that pass through them.
</p>
<p>
Pi currently comes with three activators:
</p>
<ul>
<li>
the simplest is the <b>always-on activator</b>, which basically makes all application instances active
</li>
<li>
the <b>shared DHT record activator</b> can be used to specify activation of N applications, each optionally
acquiring one of N system resources upon activation. Activation state is stored in a DHT record which is
polled by applications, hence care must be taken to set polling frequency according to the overall
number of candidate nodes
</li>
<li>
the <b>supernode activator</b> will activate one of N applications. It differs from the shared DHT record
activator in that activation is determined by whether or not a node is nearest to one or more of a
set of predetermined points in node id space (in other words, whether a node's id is closest to
one of a set of IDs determined by a function stored in the DHT).
</li>
</ul>
<p>
It is additionally possible to specify basic exclusion rules between applications, such that an
application is prevented from becoming active if some other specific application is already active.
This may be desirable if, for instance, two applications may conflict in some way if run on the same
node, or place high load on compute, network and / or storage resources on that node.
<p>
<h4>
Exposed Service
<i>(How can I expose a service to external consumers?)</i>
</h4>
<p>
Pi can automatically manage the exposure of an application via a set of IP addresses, across a range
of nodes. This is achieved by extending the base class that provides managed addressing capabilities
for a Pi application, selecting the shared DHT record application activator, and seeding the relevant
application record with appropriate addresses in the DHT.
</p>
<h3>
3. Data
</h3>
<h4>
Distributed Hashtable (DHT)
<i>(How do I store and retrieve persistent data in Pi?)</i>
</h4>
<p>
Pi provides an implementation of a DHT, acting as a distributed key-value store.
Currently this is optimised for storage and retrieval of relatively 'small' data records.
Intention was to also provide support for <b>append</b> operations on entries, with a view to
supporting easy and efficient gathering of large amounts of data for later indexing or processing.
</p>
<p>
Pi DHT provides the following operations:
</p>
<ul>
<li>
<b>put</b>, which replaces an existing record with a new one, just like a traditional
hashtable put operation
</li>
<li>
<b>get</b>, which, given a key, reads and returns the record stored against it - again this is
analogous to the get operation on a hashtable
</li>
<li>
<b>update</b>, which reads a record, delegates its update to the application, then writes the
updated record using optimistic concurrency control. This means that the logic that
updates an existing record may be invoked more than once, typically if a different update,
perhaps from a different application or node, occurred between the original record being
read and the updated record being written.
</li>
</ul>
<p>
There is no <b>delete</b> operation due to the replicated and dynamic nature of a DHT. Applications
can however add a 'tombstone' flag to mark records as deleted and treat them as such.
</p>
<p>
Pi's DHT API comes in asynchronous and synchronous (blocking) flavours. Asynchronous should
be used wherever possible. Blocking DHT operations are provided for convenience, but should only
be used where there is certainty that they will never be invoked from the event worker thread.
An example would be aDHT operation invoked from a web server thread whilst processing a request.
</p>
<p>
In 'classic' DHT fashion, DHT records are stored on the node with node ID that is nearest to the
ID of the key. Records being written are typically replicated to one or more nodes adjacent (in
node ID space) to the nearest-ID node. How many replicas are written in total, and how many
replaces have to be written in order for a write to be deemed successful, are configuration
parameters of Pi that express a tradeoff between write availability and consistency.
</p>
<p>
Data records in the DHT are stored with a set of HTTP-like headers that represent metadata describing
the record's ID, its URI (and thus, via the scheme, the type of the stored resource), content type,
version and so forth.
</p>
<p>
In addition to using remote DHT APIs, each node can also 'scan' DHT data stored locally on that
node. This is very useful when data with certain properties - perhaps a record that has expired
in some way - needs to be found and acted upon in some way. There is also support for indexing local
resources of a given type, then publishing the result to some set of supernodes to provide runtime
querying, aggregation, search or reporting facilities. These local DHT access APIs are not used
for altering or updating data as this side-steps the ID-based scheme that drives data record location.
</p>
<h4>
Entity Identifiers and Modelling
<i>(How do I model, identify and locate domain-specific entities stored in Pi?)</i>
</h4>
<p>
Pi mandates the use of a common data model for application entities - naturally there is a need
for applications on different nodes to be able to communicate, share data, and understand that data.
Furthermore, in general a large P2P network will consist of different versions of applications,
hence applications have to be robust to this, and in some cases code for multiple versions of a
given entity, as already indicated under Data Storage in the section on Developing Pi Applications.
</p>
<p>
Pi entities are consequently modelled as Java objects that implement a
lightweight interface (PiEntity), serialised and stored as JSON. This provides a structured data
format, without imposing a rigid schema, allowing for a less rigid data processing model whilst
still allowing explicit structure that each application expects to be supported. Fine-grained
control over binding of JSON to Java is done through standard Java annotations.
</p>
<p>
Pi treats all instances of entities as resources, each with its own URI. The scheme of the URI
represents the type of the resource.
</p>
<p>
For Pi Cloud, there is an explicit mapping to Pi / DHT ID for a resource from the combination
of the URI and the scope of that resource (availability zone, region or global). An ID generator
function is thus provided to return the correct ID given this scope and identifier information.
</p>
<h4>
Searching and Indexing
<i>(How do I aggregate data, or find data matching certain criteria?)</i>
</h4>
<p>
Pi's decetralised nature and support for basic data access operations only lead to several
techniques for indexing, search, discovery and reporting, with required effort levels growing
with dataset size.
</p>
<p>
For small, limited-size data sets, a single DHT record can be used, and
can simply be searched or processed to generate reports.
</p>
<p>
Taking this a step further, a finite, known set of DHT records can be used to store data,
and these can be indexed periodically to create / update a set of index records that can
be used for searches. There will be a latency associated with indexed data.
</p>
<p>
For queries that are known at development time, a search through records on the local node
can be used, with the discovery of a matching record triggering some action.
</p>
<p>
Where the query is not known until runtime, local DHT store can be examined, and data on any
records of interest published to a set of supernodes for aggregation and querying.
</p>
<p>
Finally, where latency is not an issue, DHT records can be batch-processed for reporting,
indexing and other purposes.
</p>
<p>
These are all techniques that have been used with existing Pi applications. This list is
certainly not exhaustive however.
</p>
<h4>
Caching
<i>(How do I ensure DHT lookup results are reused?)</i>
</h4>
<p>
DHT reads and writes can be expensive, hence caching becomes important early on. Pi
provides blocking and non-blocking DHT cache access APIs, with both read-through and write-through
capabilities. These should be used with the same constraints already described around the use of
blocking and non-blocking DHT access APIs, with non-blocking preferred wherever possible.
</p>
<p>
Pi's current cache implementation is backed by Ehcache.
</p>
<h3>
4. Messaging
</h3>
<h4>
Message Passing and Routing
<i>(How do I send a message from one node to another?)</i>
</h4>
<p>
Pi makes it easy to send messages from one node to another. All that is required is a
destination ID, which can be computed from a resource as described in the Data section and
provides a convenient location mechanism without requiring knowledge of the resource's address.
The message will be delivered to the node whose node ID is the nearest to this destination ID.
</p>
<p>
Pi messages typically consist of a destination ID, method (a CRUD verb) and a Pi entity being
sent. Optionally, a message can be sent to an application that is different from the sending
application. Messages additionally contain elements that are generally not visible to the application,
above all a unique message identifier, a correlation identifier for matching responses with requests
(where applicable), and a transaction ID used to group related messages together.
</p>
<p>
The Pi implementation provides convenience
Messages are sent through a message context. A new message context can be created trivially
from a message context factory (an interface implemented by all Pi applications), or a message
context of a received message can be used to send a new message. The latter approach has the
advantage of keeping a consistent transaction ID between messages - thus related log messages
across a series of nodes can easily be identified.
</p>
<p>
This structure can be altered or replaced if required.
</p>
<p>
Note that Pi messaging is inherently unreliable. Although nodes do have a level of routing logic
that improves robustness under node churn, sent messages are not guaranteed to arrive at their
destination. Other mechanisms need to be used to ensure reliability.
</p>
<h4>
Publish-subscribe
<i>(How can a set of nodes advertise a service or interest?)</i>
</h4>
<p>
Pi supports the standard publish-subscribe paradigm. By leveraging properties of the p2p overlay,
it is entirely decentralised and able to support them in a massively scalable manner. Topics are
mapped to Pi IDs. Subscribers to a topic join that topic by adding themselves to a tree of
subscribers, rooted at the node closest to the topic ID. Nodes wishing to publish to a topic do
so by sending a message toward the topic ID; this is intercepted and handled en route by the
first node to subscribed to that topic. If there are no subscribers, the message terminates at the
topic ID node, where the 'no subscribers' condition can be handled.
</p>
<p>
All of this is abstacted from Pi applications, which only have to worry about declaring that they
wish to subscribe to or unsubscribe from a topic, and optionally handling the 'no subscribers' case.
</p>
<h4>
Anycast
<i>(How do I send a message to one of a series of interested recipients?)</i>
</h4>
<p>
Anycast involves sending a message to a topic, such that the message is processed sequentially,
one node at a time. A node that receives an anycast message may process it without forwarding it
onto other topic subscribers, alter and / or process it partially before forwarding, or forward it
on without processing.
</p>
<p>
Anycast messages can be sent 'directly' or via a randomly chosen intermediate node. The latter is
useful when wanting to ensure that anycast messages sent from a particular node do not traverse the
topic tree the same way, providing better distribution of messages across subscribers.
</p>
<p>
Among other uses, anycast provides an effective, decentralised mechanism for finding nodes that can
handle a certain request or supply a certain resource.
</p>
<h4>
Broadcast
<i>(How do I send a message to a group of interested recipients?)</i>
</h4>
<p>
An application can broadcast a message to a topic. Subscribers to a given topic will distribute
the message to all known peers that have also subscribed to the topic. The message is in effect
distributed throughout the topic node tree in parallel, making broadcast efficient.
</p>
<h3>5. Resource Management</h3>
<h4>
Consumed Resource Registry
<i>(How do I track the usage of a resource that may have multiple consumers and / or require
co-ordination across nodes?)</i>
</h4>
<p>
Sometimes distributed applications, or different applications running on the same node, need to
manage the lifeycle of an resource, creating it on demand and destroying it when no longer
needed. This becomes particularly important when that resource is scarce, consumes other resources,
or when duplicate instances of the same resource are problematic. Examples from Pi Cloud include
network security group NATing (used by apps / VMs on multiple nodes) and network setup for VMs
(used by multiple apps).
</p>
<p>
Pi provides a simple 'consumed resource registry' component, which allow applications to track
how many consumers a particular resource has, and thus determine when a resource needs to be
created (first consumer registered) or removed (last consumer deregistered).
</p>
<p>
There is also an extended resource registry implementation, a 'cached consumed DHT resource
registry'. This is appropriate when a resource is used across multiple nodes (as in the security
group scenario above). In this case, an entry in the DHT is used to track resource state.
Apps update and watch the DHT record to reflect resource lifecycle and if required track its
consumers. Each node tracks changes to that resource state via a resource
refresh strategy, for instance enabling the triggering of a periodic refresh of the resource
through reloading the DHT record, and allows specific actions to be performed upon refresh.
For fault tolerance, the semantics of the implementation also allow each consumer of a resource
to itself be probed (ping / heartbeats) to ensure that particular consumer still exists.
Cached resource records can be retrieved on demand, as can a list of consumers for a specific
resource, and a list of resources of a certain type. This is essential when applications need to track
state in memory, avoiding the need for frequent DHT operations. When a resource needs to be
allocated to a unique owner node, this is typically based on node ID proximity to the ID of the
resource name hash.
</p>
<h4>
Subscriber Resource Allocation
<i>(How do I allocate resources available across a set of nodes?)</i>
</h4>
<p>
A highly scalable, decentralised scheme for resource allocation involves nodes that possess
the given resource advertising its availability by subscribing to a topic. A client wishing
to utilise that resource sends an anycast message to the topic. This is handled by one or more
recipient nodes, which allocate the resource as appropriate.
</p>
<h3>
6. Fault Tolerance
</h3>
<h4>
Task Processing Queues
<i>(How do I capture a task or operation so that it can be retried if unsuccessful?)</i>
</h4>
<p>
Pi provides a means of storing details of pending operations (tasks) which can be carried out in
an idempotent fashion, and hence retired if a previous attempt to execute had an unsuccessful
or unknown outcome.
</p>
<p>
A Pi application can define one or more task processing queues, usually through seeding underlying
DHT records. When a task needs to be performed, it is added to the appropriate queue. Instances
of that same application monitor the queue for any tasks that have not been picked up or have
timed out (become stale) so can be retried. A maximum number of retries can be set before tasks
are reported as failed.
</p>
<p>
If the initial (or subsequent) execution of a task succeeds, the corresponding queue item is
removed. If, however, it fails, the polling mechanism will ensure it gets retried.
</p>
<h4>
Watchers / Supervisors
<i>(How do I detect and resolve anomalies resulting from failures or malfunctions?)</i>
</h4>
<p>
Pi applications can implement watcher components to monitor and rectify issues - anything
from misconfiguration of the local node to data inconsistencies to consumer state monitoring.
Pi provides a watcher service, which allows application-specific watchers to be registered for
periodic execution. The granularity and frequency of each watcher can be whatever is appropriate
for each application.
</p>
<h4>
Eager Node Failure Recovery
<i>(How do I detect and act on a node failure in a timely manner?)</i>
</h4>
<p>
Pi delivers node arrival and departure events to applications. Among other things, this can
be used to detect node failure, reported as a departure event, and trigger appropriate action.
This can mean eagerly activating an existing recovery mechanism, for instance forcing application
activation checks, picking up a task from a task queue, or sending an anycast message to a set of
subscribers capable of taking over from the departed node.
</p>
TODO: tooling, experience
- 60,000 LoC, 110,000 tests
- worked! blown away by demo of pulling cable out
- development model
- DHT structure
- what constitutes an outage - various levels - network, infra etc - all about health checks
- skipnet
- tooling
- logging / trobleshooting
- subset would be nice - groups / spread
- cost of ownership / support
- releases cf FB
- nauman - cricket
- cap theorem
- orphaned nodes - shutting down, state machine approach
- user DB (divided opinions - not unlike skype before MS
- processing - dd, instance creation - nice only helped so much, needed to queue / co-ordinate in app
- networking - back and forward network
we never had cascading outage (apart from that time we blew up JVMs across the cluster - but that was us :))
functional, state machines
p2p absolutely does not absolve one of responsibility to consider system properties
Refs
- pastry, kademlia
- commutative data types
- promise of gossip
- pss / t-man
- network - layer 2
- skipnet
- microbursts
- CRDTs
- dynamo
- skype
- bit torrent?
</div>
</section>
<footer>
<p><small>Hosted on GitHub Pages — Theme by <a href="https://github.com/orderedlist">orderedlist</a></small></p>
</footer>
</div>
<script src="javascripts/scale.fix.js"></script>
</body>
</html>