Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ jobs:
run: just qa
shell: bash
- name: Check if there are any uncommitted changes
run: |
git diff --exit-code
run: just check
shell: bash
- name: unittests
run: just unittest
Expand Down
15 changes: 0 additions & 15 deletions cmd/hestia/main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,3 @@
// .*@mycompany\.com MY COMPANY 2025
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
Expand Down
49 changes: 49 additions & 0 deletions cmd/hestiaservice/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"fmt"
"os"

"github.com/eccles/hestia/httpserver"
"github.com/eccles/hestia/logger"
"github.com/eccles/hestia/services/hestia"
"github.com/eccles/hestia/startup"
)

const (
serviceName = "hestiat"
)

func main() {
startup.Run(serviceName, run)
}

func run(log logger.Logger) error {
port, ok := os.LookupEnv("PORT")
if !ok {
err := fmt.Errorf("required environment variable is not defined: %s", "PORT")
log.Info(err.Error())
return err
}

service := hestia.New(
serviceName,
log,
&hestia.Config{},
)
defer service.Close()

h := httpserver.New(
log,
serviceName,
port,
service.Mux(),
)

s := startup.NewListeners(
log,
serviceName,
startup.WithListeners(h),
)
return s.Listen() // blocks until either one listener fails or sigterm is received.
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

109 changes: 109 additions & 0 deletions grpchealth/grpchealth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package grpchealth

import (
"context"
"sync"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc/health/grpc_health_v1"
)

const (
livenessServiceName = "liveness"
readinessServiceName = "readiness"
)

type HealthCheckingService struct {
grpc_health_v1.UnimplementedHealthServer
sync.RWMutex
healthStatus map[string]grpc_health_v1.HealthCheckResponse_ServingStatus
log Logger
}

func New(log Logger) HealthCheckingService {
return HealthCheckingService{
healthStatus: map[string]grpc_health_v1.HealthCheckResponse_ServingStatus{
livenessServiceName: grpc_health_v1.HealthCheckResponse_SERVING,
readinessServiceName: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
},
log: log,
}
}

func (s *HealthCheckingService) serving(service string) {
s.Lock()
defer s.Unlock()
s.healthStatus[service] = grpc_health_v1.HealthCheckResponse_SERVING
s.log.Info("Health set to 'SERVING': %s", service)
}

func (s *HealthCheckingService) notServing(service string) {
s.Lock()
defer s.Unlock()
s.healthStatus[service] = grpc_health_v1.HealthCheckResponse_NOT_SERVING
s.log.Info("Health set to 'NOT_SERVING': %s", service)
}

// Dead - changes status of service to dead.
func (s *HealthCheckingService) Dead() {
s.notServing(livenessServiceName)
}

// Live - changes status of service to alive.
func (s *HealthCheckingService) Live() {
s.serving(livenessServiceName)
}

// NotReady - changes status of service to not ready.
func (s *HealthCheckingService) NotReady() {
s.notServing(readinessServiceName)
}

// Ready - changes status of service to ready.
func (s *HealthCheckingService) Ready() {
s.serving(readinessServiceName)
}

// Check implements `service Health`.
func (s *HealthCheckingService) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest) (
*grpc_health_v1.HealthCheckResponse, error) {
s.RLock()
defer s.RUnlock()

// logger.Sugar.Debug("Health Check for '%s'", in.Service)
if in.Service == "" {
for _, v := range s.healthStatus {
// logger.Sugar.Debug("Health Check for '%s'-> '%s'", in.Service, v.String())
if v != grpc_health_v1.HealthCheckResponse_SERVING {
s.log.Info("Health Check '%s' is NOT SERVING: '%s'", in.Service, v.String())
return &grpc_health_v1.HealthCheckResponse{
Status: v,
}, nil
}
}
s.log.Info("Health Check '%s' is SERVING", in.Service)
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
if stat, ok := s.healthStatus[in.Service]; ok {
s.log.Debug("Health Check '%s' is `%s'", in.Service, stat)
return &grpc_health_v1.HealthCheckResponse{
Status: stat,
}, nil
}
err := status.Error(codes.NotFound, "unknown service: "+in.Service)

s.log.Info("Health Check failed: %v", err)
return nil, err
}

func (s *HealthCheckingService) Watch(
in *grpc_health_v1.HealthCheckRequest,
w grpc_health_v1.Health_WatchServer,
) error {
s.log.Info("Health Check watch not supported")
return status.Error(codes.Unimplemented, "watch not supported")
}
7 changes: 7 additions & 0 deletions grpchealth/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package grpchealth

import (
"github.com/eccles/hestia/logger"
)

type Logger = logger.Logger
5 changes: 5 additions & 0 deletions grpcserver/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Package grpcserver provides a server instance for grpc communications.
//
// The server instance has a Listener interface so it can be started and stoped by the
// Listener subsystem. Additionally interceptors for various functionality such as
package grpcserver
147 changes: 147 additions & 0 deletions grpcserver/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package grpcserver

import (
"context"
"errors"
"fmt"
"net"
"strings"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_otrace "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

grpcHealth "google.golang.org/grpc/health/grpc_health_v1"

"github.com/eccles/hestia/grpchealth"
)

// so we dont have to import grpc when using this package.
type grpcServer = grpc.Server
type grpcUnaryServerInterceptor = grpc.UnaryServerInterceptor

type RegisterServer func(*grpcServer)

func defaultRegisterServer(g *grpcServer) {}

type GRPCServer struct {
name string
log Logger
listenStr string
health bool
healthService *grpchealth.HealthCheckingService
interceptors []grpcUnaryServerInterceptor
register RegisterServer
server *grpcServer
reflection bool
}

type GRPCServerOption func(*GRPCServer)

func WithAppendedInterceptor(i grpcUnaryServerInterceptor) GRPCServerOption {
return func(g *GRPCServer) {
g.interceptors = append(g.interceptors, i)
}
}

func WithPrependedInterceptor(i grpcUnaryServerInterceptor) GRPCServerOption {
return func(g *GRPCServer) {
g.interceptors = append([]grpcUnaryServerInterceptor{i}, g.interceptors...)
}
}

func WithRegisterServer(r RegisterServer) GRPCServerOption {
return func(g *GRPCServer) {
g.register = r
}
}

func WithoutHealth() GRPCServerOption {
return func(g *GRPCServer) {
g.health = false
}
}

func WithReflection(r bool) GRPCServerOption {
return func(g *GRPCServer) {
g.reflection = r
}
}

func tracingFilter(ctx context.Context, fullMethodName string) bool {
return fullMethodName != grpcHealth.Health_Check_FullMethodName
}

// New creates a new GRPCServer that is bound to a specific GRPC API. This object complies with
// the standard Listener service and can be managed by the startup.Listeners object.
func New(log Logger, name string, port string, opts ...GRPCServerOption) GRPCServer {
g := GRPCServer{
name: strings.ToLower(name),
listenStr: fmt.Sprintf(":%s", port),
register: defaultRegisterServer,
interceptors: []grpc.UnaryServerInterceptor{
grpc_otrace.UnaryServerInterceptor(grpc_otrace.WithFilterFunc(tracingFilter)),
grpc_validator.UnaryServerInterceptor(),
},
health: true,
}
for _, opt := range opts {
opt(&g)
}
server := grpc.NewServer(
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(g.interceptors...),
),
)

g.register(server)

if g.health {
healthService := grpchealth.New(log)
g.healthService = &healthService
grpcHealth.RegisterHealthServer(server, g.healthService)
}

if g.reflection {
reflection.Register(server)
}

g.server = server
g.log = log.With("grpcserver", g.String())
return g
}

func (g *GRPCServer) String() string {
// No logging in this method please.
return fmt.Sprintf("%s%s", g.name, g.listenStr)
}

func (g *GRPCServer) Listen() error {
listen, err := net.Listen("tcp", g.listenStr)
if err != nil {
return fmt.Errorf("failed to listen %s: %w", g, err)
}

if g.healthService != nil {
g.healthService.Ready() // readiness
}

g.log.Info("Listen")
err = g.server.Serve(listen)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("failed to serve %s: %w", g, err)
}
return nil
}

func (g *GRPCServer) Shutdown(_ context.Context) error {
g.log.Info("Shutdown")
if g.healthService != nil {
g.healthService.NotReady() // readiness
g.healthService.Dead() // liveness
}
g.server.GracefulStop()
return nil
}
7 changes: 7 additions & 0 deletions grpcserver/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package grpcserver

import (
"github.com/eccles/hestia/logger"
)

type Logger = logger.Logger
Loading