Skip to content

Commit 6b21e58

Browse files
committed
Add one-shot mode for sealed execution
Sealed execution allows only one request to execute before the process exits, causing a restart, for a fresh process. With SlicerVM, this means a hardware per request barrier. Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
1 parent 67a6309 commit 6b21e58

File tree

6 files changed

+209
-9
lines changed

6 files changed

+209
-9
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ Environmental variables:
205205
| `log_call_id` | In HTTP mode, when printing a response code, content-length and timing, include the X-Call-Id header at the end of the line in brackets i.e. `[079d9ff9-d7b7-4e37-b195-5ad520e6f797]` or `[none]` when it's empty. Default: `false` |
206206
| `max_inflight` | Limit the maximum number of requests in flight, and return a HTTP status 429 when exceeded |
207207
| `mode` | The mode which of-watchdog operates in, Default `streaming` [see doc](#3-streaming-fork-modestreaming---default). Options are [http](#1-http-modehttp), [serialising fork](#2-serializing-fork-modeserializing), [streaming fork](#3-streaming-fork-modestreaming---default), [static](#4-static-modestatic) |
208+
| `one_shot` | When set to `true`, accept the first genuine invoke request, then immediately begin graceful shutdown and reject subsequent invoke requests. Readiness and health endpoints do not trigger this mode. |
208209
| `port` | Specify an alternative TCP port for testing. Default: `8080` |
209210
| `prefix_logs` | When set to `true` the watchdog will add a prefix of "Date Time" + "stderr/stdout" to every line read from the function process. Default `true` |
210211
| `read_timeout` | HTTP timeout for reading the payload from the client caller (in seconds) |
@@ -214,11 +215,12 @@ Environmental variables:
214215
| `upstream_url` | Alias for `http_upstream_url` |
215216
| `write_timeout` | HTTP timeout for writing a response body from your function (in seconds) |
216217

218+
`shutdown_after_first_request` remains accepted as a deprecated alias for `one_shot`.
219+
217220
Unsupported options from the [Classic Watchdog](https://github.com/openfaas/classic-watchdog):
218221

219222
| Option | Usage |
220223
| -------------------- | --------------------------------------------------------------------------------------------- |
221224
| `write_debug` | In the classic watchdog, this prints the response body out to the console |
222225
| `read_debug` | In the classic watchdog, this prints the request body out to the console |
223226
| `combined_output` | In the classic watchdog, this returns STDOUT and STDERR in the function's HTTP response, when off it only returns STDOUT and prints STDERR to the logs of the watchdog |
224-

config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ type WatchdogConfig struct {
6969
// HTTP mode.
7070
LogCallId bool
7171

72+
// OneShot marks the watchdog as unhealthy and begins graceful shutdown
73+
// when the first genuine invoke request starts.
74+
OneShot bool
75+
7276
// Handler is the HTTP handler to use in "inproc" mode
7377
Handler http.HandlerFunc
7478
}
@@ -178,6 +182,7 @@ func New(env []string) (WatchdogConfig, error) {
178182
LogBufferSize: logBufferSize,
179183
ReadyEndpoint: envMap["ready_path"],
180184
LogCallId: logCallId,
185+
OneShot: getBool(envMap, "one_shot") || getBool(envMap, "shutdown_after_first_request"),
181186
}
182187

183188
if val := envMap["mode"]; len(val) > 0 {

config/config_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,3 +389,61 @@ func Test_NonParsableString_parseIntOrDurationValue(t *testing.T) {
389389
t.Error(fmt.Sprintf("want: %q got: %q", want, got))
390390
}
391391
}
392+
393+
func TestNewParsesOneShot(t *testing.T) {
394+
cfg, err := New([]string{
395+
"fprocess=/bin/cat",
396+
"mode=streaming",
397+
"one_shot=true",
398+
})
399+
if err != nil {
400+
t.Fatalf("expected config to parse, got error: %v", err)
401+
}
402+
403+
if !cfg.OneShot {
404+
t.Fatalf("expected OneShot to be true")
405+
}
406+
}
407+
408+
func TestNewParsesDeprecatedShutdownAfterFirstRequestAlias(t *testing.T) {
409+
cfg, err := New([]string{
410+
"fprocess=/bin/cat",
411+
"mode=streaming",
412+
"shutdown_after_first_request=true",
413+
})
414+
if err != nil {
415+
t.Fatalf("expected config to parse, got error: %v", err)
416+
}
417+
418+
if !cfg.OneShot {
419+
t.Fatalf("expected deprecated alias to enable OneShot")
420+
}
421+
}
422+
423+
func TestNewParsesOneShotNumericValues(t *testing.T) {
424+
cfg, err := New([]string{
425+
"fprocess=/bin/cat",
426+
"mode=streaming",
427+
"one_shot=1",
428+
})
429+
if err != nil {
430+
t.Fatalf("expected config to parse, got error: %v", err)
431+
}
432+
433+
if !cfg.OneShot {
434+
t.Fatalf("expected one_shot=1 to enable OneShot")
435+
}
436+
437+
cfg, err = New([]string{
438+
"fprocess=/bin/cat",
439+
"mode=streaming",
440+
"one_shot=0",
441+
})
442+
if err != nil {
443+
t.Fatalf("expected config to parse, got error: %v", err)
444+
}
445+
446+
if cfg.OneShot {
447+
t.Fatalf("expected one_shot=0 to disable OneShot")
448+
}
449+
}

pkg/watchdog.go

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pkg
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"log"
@@ -12,6 +13,7 @@ import (
1213
"os/signal"
1314
"path/filepath"
1415
"strings"
16+
"sync"
1517
"sync/atomic"
1618
"syscall"
1719
"time"
@@ -50,8 +52,25 @@ func (w *Watchdog) Start(ctx context.Context) error {
5052
baseFunctionHandler := buildRequestHandler(w.config, w.config.PrefixLogs)
5153
requestHandler := baseFunctionHandler
5254

55+
drainCh := make(chan string, 1)
56+
var drainOnce sync.Once
57+
startDrain := func(reason string) {
58+
drainOnce.Do(func() {
59+
if err := markUnhealthy(); err != nil {
60+
log.Printf("Unable to mark server as unhealthy: %s\n", err.Error())
61+
}
62+
63+
log.Printf("Scheduling graceful shutdown: %s\n", reason)
64+
drainCh <- reason
65+
})
66+
}
67+
68+
if w.config.OneShot {
69+
requestHandler = makeOneShotHandler(requestHandler, w.config.ReadyEndpoint, startDrain)
70+
}
71+
5372
if w.config.JWTAuthentication {
54-
handler, err := makeJWTAuthHandler(w.config, baseFunctionHandler)
73+
handler, err := makeJWTAuthHandler(w.config, requestHandler)
5574
if err != nil {
5675
return fmt.Errorf("error creating JWTAuthMiddleware: %w", err)
5776
}
@@ -111,6 +130,7 @@ func (w *Watchdog) Start(ctx context.Context) error {
111130
w.config.HealthcheckInterval,
112131
w.config.HTTPWriteTimeout,
113132
w.config.SuppressLock,
133+
drainCh,
114134
&httpMetrics)
115135

116136
return nil
@@ -122,32 +142,46 @@ func markUnhealthy() error {
122142
path := filepath.Join(os.TempDir(), ".lock")
123143
log.Printf("Removing lock-file : %s\n", path)
124144
removeErr := os.Remove(path)
145+
if errors.Is(removeErr, os.ErrNotExist) {
146+
return nil
147+
}
125148
return removeErr
126149
}
127150

128-
func listenUntilShutdown(s *http.Server, shutdownCtx context.Context, healthcheckInterval time.Duration, writeTimeout time.Duration, suppressLock bool, httpMetrics *metrics.Http) error {
151+
func listenUntilShutdown(s *http.Server, shutdownCtx context.Context, healthcheckInterval time.Duration, writeTimeout time.Duration, suppressLock bool, drain <-chan string, httpMetrics *metrics.Http) error {
129152

130153
idleConnsClosed := make(chan struct{})
131154
go func() {
132155
sig := make(chan os.Signal, 1)
133156
signal.Notify(sig, syscall.SIGTERM)
134157

135158
reason := ""
159+
drainDelay := healthcheckInterval
136160

137161
select {
138162
case <-sig:
139163
reason = "SIGTERM"
140164
case <-shutdownCtx.Done():
141165
reason = "Context cancelled"
166+
case reason = <-drain:
167+
drainDelay = 0
142168
}
143169

144-
log.Printf("%s: no new connections in %s\n", reason, healthcheckInterval.String())
170+
if drainDelay > 0 {
171+
log.Printf("%s: no new connections in %s\n", reason, drainDelay.String())
172+
} else {
173+
log.Printf("%s: no new connections allowed immediately\n", reason)
174+
}
145175

146176
if err := markUnhealthy(); err != nil {
147177
log.Printf("Unable to mark server as unhealthy: %s\n", err.Error())
148178
}
149179

150-
<-time.Tick(healthcheckInterval)
180+
if drainDelay > 0 {
181+
timer := time.NewTimer(drainDelay)
182+
defer timer.Stop()
183+
<-timer.C
184+
}
151185

152186
connections := int64(testutil.ToFloat64(httpMetrics.InFlight))
153187
log.Printf("No new connections allowed, draining: %d requests\n", connections)
@@ -193,6 +227,34 @@ func listenUntilShutdown(s *http.Server, shutdownCtx context.Context, healthchec
193227
return nil
194228
}
195229

230+
func makeOneShotHandler(next http.Handler, readyEndpoint string, startDrain func(string)) http.Handler {
231+
var served int32
232+
233+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
234+
if r.URL != nil {
235+
switch r.URL.Path {
236+
case "/_/health", "/_/ready":
237+
next.ServeHTTP(w, r)
238+
return
239+
case readyEndpoint:
240+
if readyEndpoint != "" {
241+
next.ServeHTTP(w, r)
242+
return
243+
}
244+
}
245+
}
246+
247+
if !atomic.CompareAndSwapInt32(&served, 0, 1) {
248+
w.WriteHeader(http.StatusServiceUnavailable)
249+
w.Write([]byte("watchdog is draining after serving a request"))
250+
return
251+
}
252+
253+
startDrain("one_shot")
254+
next.ServeHTTP(w, r)
255+
})
256+
}
257+
196258
func buildRequestHandler(cfg config.WatchdogConfig, prefixLogs bool) http.Handler {
197259
var requestHandler http.HandlerFunc
198260

pkg/watchdog_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package pkg
2+
3+
import (
4+
"net/http"
5+
"net/http/httptest"
6+
"sync/atomic"
7+
"testing"
8+
)
9+
10+
func TestMakeOneShotHandlerDrainsAndRejectsSubsequentRequests(t *testing.T) {
11+
var calls int32
12+
var drains int32
13+
14+
handler := makeOneShotHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15+
atomic.AddInt32(&calls, 1)
16+
w.WriteHeader(http.StatusAccepted)
17+
w.Write([]byte("ok"))
18+
}), "", func(reason string) {
19+
atomic.AddInt32(&drains, 1)
20+
})
21+
22+
firstReq := httptest.NewRequest(http.MethodPost, "/", nil)
23+
firstRes := httptest.NewRecorder()
24+
handler.ServeHTTP(firstRes, firstReq)
25+
26+
if firstRes.Code != http.StatusAccepted {
27+
t.Fatalf("expected first request to pass through, got status %d", firstRes.Code)
28+
}
29+
30+
secondReq := httptest.NewRequest(http.MethodPost, "/", nil)
31+
secondRes := httptest.NewRecorder()
32+
handler.ServeHTTP(secondRes, secondReq)
33+
34+
if secondRes.Code != http.StatusServiceUnavailable {
35+
t.Fatalf("expected second request to be rejected, got status %d", secondRes.Code)
36+
}
37+
38+
if got := atomic.LoadInt32(&calls); got != 1 {
39+
t.Fatalf("expected next handler to be called once, got %d", got)
40+
}
41+
42+
if got := atomic.LoadInt32(&drains); got != 1 {
43+
t.Fatalf("expected drain to be scheduled once, got %d", got)
44+
}
45+
}
46+
47+
func TestMakeOneShotHandlerIgnoresReadyEndpoint(t *testing.T) {
48+
var calls int32
49+
var drains int32
50+
51+
handler := makeOneShotHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
52+
atomic.AddInt32(&calls, 1)
53+
w.WriteHeader(http.StatusOK)
54+
}), "/ready", func(reason string) {
55+
atomic.AddInt32(&drains, 1)
56+
})
57+
58+
readyReq := httptest.NewRequest(http.MethodGet, "/ready", nil)
59+
readyRes := httptest.NewRecorder()
60+
handler.ServeHTTP(readyRes, readyReq)
61+
62+
firstReq := httptest.NewRequest(http.MethodPost, "/", nil)
63+
firstRes := httptest.NewRecorder()
64+
handler.ServeHTTP(firstRes, firstReq)
65+
66+
if firstRes.Code != http.StatusOK {
67+
t.Fatalf("expected first real request to pass through, got status %d", firstRes.Code)
68+
}
69+
70+
if got := atomic.LoadInt32(&calls); got != 2 {
71+
t.Fatalf("expected handler to be called for readiness and first invoke, got %d", got)
72+
}
73+
74+
if got := atomic.LoadInt32(&drains); got != 1 {
75+
t.Fatalf("expected drain to be scheduled once for the real request, got %d", got)
76+
}
77+
}

vendor/modules.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ github.com/docker/go-units
1010
# github.com/golang-jwt/jwt/v5 v5.3.0
1111
## explicit; go 1.21
1212
github.com/golang-jwt/jwt/v5
13-
# github.com/google/uuid v1.6.0
14-
## explicit
15-
# github.com/gorilla/mux v1.8.1
16-
## explicit; go 1.20
1713
# github.com/kylelemons/godebug v1.1.0
1814
## explicit; go 1.11
1915
github.com/kylelemons/godebug/diff

0 commit comments

Comments
 (0)