-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathTestScript4.py
More file actions
56 lines (44 loc) · 1.77 KB
/
TestScript4.py
File metadata and controls
56 lines (44 loc) · 1.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
from csw.Parameter import stringKey
from csw.ParameterSetType import Setup
from sequencer.Script import Script
def script(ctx: Script):
@ctx.onNewSequence()
async def handleNewSequence():
newSequenceHandlerParam = stringKey("onNewSequence").set("Started")
event = ctx.SystemEvent("LGSF.darknight", "NewSequenceHandler", newSequenceHandlerParam)
await ctx.publishEvent(event)
await asyncio.sleep(0.5)
@ctx.onSetup("command-1")
async def handleCommand1(_: Setup):
newSequenceParam = stringKey("sequence-command-1").set("Started")
event = ctx.SystemEvent("LGSF.darknight", "command1", newSequenceParam)
await ctx.publishEvent(event)
@ctx.onSetup("command-lgsf")
async def handleCommandLgsf(_: Setup):
# NOT update command response To avoid sequencer to
# finish so that other commands gets time
await asyncio.sleep(1.0)
@ctx.onAbortSequence()
async def handleAbortSequence():
# do some actions to abort sequence
successEvent = ctx.SystemEvent("TCS.test", "abort.success")
await ctx.publishEvent(successEvent)
@ctx.onStop()
async def handleStop():
# do some actions to stop
successEvent = ctx.SystemEvent("TCS.test", "stop.success")
await ctx.publishEvent(successEvent)
# XXX setup "time-service-dsl" not referenced anywhere
# onSetup("time-service-dsl") {
# val offset = utcTimeAfter(2.seconds).offsetFromNow()
#
# schedulePeriodically(utcTimeAfter(5.seconds), offset) {
# publishEvent(SystemEvent("LGSF.test", "publish.success"))
# }
#
# scheduleOnce(taiTimeNow()) {
# publishEvent(SystemEvent("LGSF.test", "publish.success"))
# }
# }
# }