diff --git a/.travis.yml b/.travis.yml index b8c52dd..ff61464 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,4 +6,4 @@ - 1.4.2 - 1.5.3 - 1.6 - - tip + #- tip diff --git a/examples/profilesvc/endpoints.go b/examples/profilesvc/endpoints.go index 7ab9de8..062cf9a 100644 --- a/examples/profilesvc/endpoints.go +++ b/examples/profilesvc/endpoints.go @@ -41,15 +41,16 @@ func (r postProfileResponse) error() error { return r.Err } -// Regarding errors returned from service methods, we have two options. We can -// return the error via the endpoint itself. That makes certain things a -// little bit easier, like providing non-200 HTTP responses to the client. But -// Go kit assumes that endpoint errors are (or may be treated as) transport- -// level errors. For example, an endpoint error will count against a circuit -// breaker error count. Therefore, it's almost certainly better to return -// service method (business logic) errors in the response object. This means -// we have to do a bit more work in the HTTP response encoder to detect e.g. a -// not-found error and provide a proper HTTP status code. +// Regarding errors returned from service (business logic) methods, we have two +// options. We could return the error via the endpoint itself. That makes +// certain things a little bit easier, like providing non-200 HTTP responses to +// the client. But Go kit assumes that endpoint errors are (or may be treated +// as) transport-domain errors. For example, an endpoint error will count +// against a circuit breaker error count. Therefore, it's almost certainly +// better to return service (business logic) errors in the response object. This +// means we have to do a bit more work in the HTTP response encoder to detect +// e.g. a not-found error and provide a proper HTTP status code. That work is +// done with the errorer interface, in transport.go. func makePostProfileEndpoint(s ProfileService) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { diff --git a/examples/profilesvc/logging_middleware.go b/examples/profilesvc/logging_middleware.go deleted file mode 100644 index 698e268..0000000 --- a/examples/profilesvc/logging_middleware.go +++ /dev/null @@ -1,77 +0,0 @@ -package main - -import ( - "time" - - "golang.org/x/net/context" - - "github.com/go-kit/kit/log" -) - -type loggingMiddleware struct { - next ProfileService - logger log.Logger -} - -func (mw loggingMiddleware) PostProfile(ctx context.Context, p Profile) (err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "PostProfile", "id", p.ID, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.PostProfile(ctx, p) -} - -func (mw loggingMiddleware) GetProfile(ctx context.Context, id string) (p Profile, err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "GetProfile", "id", id, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.GetProfile(ctx, id) -} - -func (mw loggingMiddleware) PutProfile(ctx context.Context, id string, p Profile) (err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "PutProfile", "id", id, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.PutProfile(ctx, id, p) -} - -func (mw loggingMiddleware) PatchProfile(ctx context.Context, id string, p Profile) (err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "PatchProfile", "id", id, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.PatchProfile(ctx, id, p) -} - -func (mw loggingMiddleware) DeleteProfile(ctx context.Context, id string) (err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "DeleteProfile", "id", id, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.DeleteProfile(ctx, id) -} - -func (mw loggingMiddleware) GetAddresses(ctx context.Context, profileID string) (addresses []Address, err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "GetAddresses", "profileID", profileID, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.GetAddresses(ctx, profileID) -} - -func (mw loggingMiddleware) GetAddress(ctx context.Context, profileID string, addressID string) (a Address, err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "GetAddress", "profileID", profileID, "addressID", addressID, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.GetAddress(ctx, profileID, addressID) -} - -func (mw loggingMiddleware) PostAddress(ctx context.Context, profileID string, a Address) (err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "PostAddress", "profileID", profileID, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.PostAddress(ctx, profileID, a) -} - -func (mw loggingMiddleware) DeleteAddress(ctx context.Context, profileID string, addressID string) (err error) { - defer func(begin time.Time) { - mw.logger.Log("method", "DeleteAddress", "profileID", profileID, "addressID", addressID, "took", time.Since(begin), "err", err) - }(time.Now()) - return mw.next.DeleteAddress(ctx, profileID, addressID) -} diff --git a/examples/profilesvc/middlewares.go b/examples/profilesvc/middlewares.go new file mode 100644 index 0000000..698e268 --- /dev/null +++ b/examples/profilesvc/middlewares.go @@ -0,0 +1,77 @@ +package main + +import ( + "time" + + "golang.org/x/net/context" + + "github.com/go-kit/kit/log" +) + +type loggingMiddleware struct { + next ProfileService + logger log.Logger +} + +func (mw loggingMiddleware) PostProfile(ctx context.Context, p Profile) (err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "PostProfile", "id", p.ID, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.PostProfile(ctx, p) +} + +func (mw loggingMiddleware) GetProfile(ctx context.Context, id string) (p Profile, err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "GetProfile", "id", id, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.GetProfile(ctx, id) +} + +func (mw loggingMiddleware) PutProfile(ctx context.Context, id string, p Profile) (err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "PutProfile", "id", id, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.PutProfile(ctx, id, p) +} + +func (mw loggingMiddleware) PatchProfile(ctx context.Context, id string, p Profile) (err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "PatchProfile", "id", id, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.PatchProfile(ctx, id, p) +} + +func (mw loggingMiddleware) DeleteProfile(ctx context.Context, id string) (err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "DeleteProfile", "id", id, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.DeleteProfile(ctx, id) +} + +func (mw loggingMiddleware) GetAddresses(ctx context.Context, profileID string) (addresses []Address, err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "GetAddresses", "profileID", profileID, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.GetAddresses(ctx, profileID) +} + +func (mw loggingMiddleware) GetAddress(ctx context.Context, profileID string, addressID string) (a Address, err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "GetAddress", "profileID", profileID, "addressID", addressID, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.GetAddress(ctx, profileID, addressID) +} + +func (mw loggingMiddleware) PostAddress(ctx context.Context, profileID string, a Address) (err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "PostAddress", "profileID", profileID, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.PostAddress(ctx, profileID, a) +} + +func (mw loggingMiddleware) DeleteAddress(ctx context.Context, profileID string, addressID string) (err error) { + defer func(begin time.Time) { + mw.logger.Log("method", "DeleteAddress", "profileID", profileID, "addressID", addressID, "took", time.Since(begin), "err", err) + }(time.Now()) + return mw.next.DeleteAddress(ctx, profileID, addressID) +} diff --git a/examples/profilesvc/profile_service.go b/examples/profilesvc/profile_service.go deleted file mode 100644 index 9e12100..0000000 --- a/examples/profilesvc/profile_service.go +++ /dev/null @@ -1,186 +0,0 @@ -package main - -import ( - "errors" - "sync" - - "golang.org/x/net/context" -) - -// ProfileService is a simple CRUD interface for user profiles. -type ProfileService interface { - PostProfile(ctx context.Context, p Profile) error - GetProfile(ctx context.Context, id string) (Profile, error) - PutProfile(ctx context.Context, id string, p Profile) error - PatchProfile(ctx context.Context, id string, p Profile) error - DeleteProfile(ctx context.Context, id string) error - GetAddresses(ctx context.Context, profileID string) ([]Address, error) - GetAddress(ctx context.Context, profileID string, addressID string) (Address, error) - PostAddress(ctx context.Context, profileID string, a Address) error - DeleteAddress(ctx context.Context, profileID string, addressID string) error -} - -// Profile represents a single user profile. -// ID should be globally unique. -type Profile struct { - ID string `json:"id"` - Name string `json:"name,omitempty"` - Addresses []Address `json:"addresses,omitempty"` -} - -// Address is a field of a user profile. -// ID should be unique within the profile (at a minimum). -type Address struct { - ID string `json:"id"` - Location string `json:"location,omitempty"` -} - -var ( - errInconsistentIDs = errors.New("inconsistent IDs") - errAlreadyExists = errors.New("already exists") - errNotFound = errors.New("not found") -) - -type inmemService struct { - mtx sync.RWMutex - m map[string]Profile -} - -func newInmemService() ProfileService { - return &inmemService{ - m: map[string]Profile{}, - } -} - -func (s *inmemService) PostProfile(ctx context.Context, p Profile) error { - s.mtx.Lock() - defer s.mtx.Unlock() - if _, ok := s.m[p.ID]; ok { - return errAlreadyExists // POST = create, don't overwrite - } - s.m[p.ID] = p - return nil -} - -func (s *inmemService) GetProfile(ctx context.Context, id string) (Profile, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - p, ok := s.m[id] - if !ok { - return Profile{}, errNotFound - } - return p, nil -} - -func (s *inmemService) PutProfile(ctx context.Context, id string, p Profile) error { - if id != p.ID { - return errInconsistentIDs - } - s.mtx.Lock() - defer s.mtx.Unlock() - s.m[id] = p // PUT = create or update - return nil -} - -func (s *inmemService) PatchProfile(ctx context.Context, id string, p Profile) error { - if p.ID != "" && id != p.ID { - return errInconsistentIDs - } - - s.mtx.Lock() - defer s.mtx.Unlock() - - existing, ok := s.m[id] - if !ok { - return errNotFound // PATCH = update existing, don't create - } - - // We assume that it's not possible to PATCH the ID, and that it's not - // possible to PATCH any field to its zero value. That is, the zero value - // means not specified. The way around this is to use e.g. Name *string in - // the Profile definition. But since this is just a demonstrative example, - // I'm leaving that out. - - if p.Name != "" { - existing.Name = p.Name - } - if len(p.Addresses) > 0 { - existing.Addresses = p.Addresses - } - s.m[id] = existing - return nil -} - -func (s *inmemService) DeleteProfile(ctx context.Context, id string) error { - s.mtx.Lock() - defer s.mtx.Unlock() - if _, ok := s.m[id]; !ok { - return errNotFound - } - delete(s.m, id) - return nil -} - -func (s *inmemService) GetAddresses(ctx context.Context, profileID string) ([]Address, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - p, ok := s.m[profileID] - if !ok { - return []Address{}, errNotFound - } - return p.Addresses, nil -} - -func (s *inmemService) GetAddress(ctx context.Context, profileID string, addressID string) (Address, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - p, ok := s.m[profileID] - if !ok { - return Address{}, errNotFound - } - for _, address := range p.Addresses { - if address.ID == addressID { - return address, nil - } - } - return Address{}, errNotFound -} - -func (s *inmemService) PostAddress(ctx context.Context, profileID string, a Address) error { - s.mtx.Lock() - defer s.mtx.Unlock() - p, ok := s.m[profileID] - if !ok { - return errNotFound - } - for _, address := range p.Addresses { - if address.ID == a.ID { - return errAlreadyExists - } - } - p.Addresses = append(p.Addresses, a) - s.m[profileID] = p - return nil -} - -func (s *inmemService) DeleteAddress(ctx context.Context, profileID string, addressID string) error { - s.mtx.Lock() - defer s.mtx.Unlock() - p, ok := s.m[profileID] - if !ok { - return errNotFound - } - newAddresses := make([]Address, 0, len(p.Addresses)) - for _, address := range p.Addresses { - if address.ID == addressID { - continue // delete - } - newAddresses = append(newAddresses, address) - } - if len(newAddresses) == len(p.Addresses) { - return errNotFound - } - p.Addresses = newAddresses - s.m[profileID] = p - return nil -} diff --git a/examples/profilesvc/service.go b/examples/profilesvc/service.go new file mode 100644 index 0000000..9e12100 --- /dev/null +++ b/examples/profilesvc/service.go @@ -0,0 +1,186 @@ +package main + +import ( + "errors" + "sync" + + "golang.org/x/net/context" +) + +// ProfileService is a simple CRUD interface for user profiles. +type ProfileService interface { + PostProfile(ctx context.Context, p Profile) error + GetProfile(ctx context.Context, id string) (Profile, error) + PutProfile(ctx context.Context, id string, p Profile) error + PatchProfile(ctx context.Context, id string, p Profile) error + DeleteProfile(ctx context.Context, id string) error + GetAddresses(ctx context.Context, profileID string) ([]Address, error) + GetAddress(ctx context.Context, profileID string, addressID string) (Address, error) + PostAddress(ctx context.Context, profileID string, a Address) error + DeleteAddress(ctx context.Context, profileID string, addressID string) error +} + +// Profile represents a single user profile. +// ID should be globally unique. +type Profile struct { + ID string `json:"id"` + Name string `json:"name,omitempty"` + Addresses []Address `json:"addresses,omitempty"` +} + +// Address is a field of a user profile. +// ID should be unique within the profile (at a minimum). +type Address struct { + ID string `json:"id"` + Location string `json:"location,omitempty"` +} + +var ( + errInconsistentIDs = errors.New("inconsistent IDs") + errAlreadyExists = errors.New("already exists") + errNotFound = errors.New("not found") +) + +type inmemService struct { + mtx sync.RWMutex + m map[string]Profile +} + +func newInmemService() ProfileService { + return &inmemService{ + m: map[string]Profile{}, + } +} + +func (s *inmemService) PostProfile(ctx context.Context, p Profile) error { + s.mtx.Lock() + defer s.mtx.Unlock() + if _, ok := s.m[p.ID]; ok { + return errAlreadyExists // POST = create, don't overwrite + } + s.m[p.ID] = p + return nil +} + +func (s *inmemService) GetProfile(ctx context.Context, id string) (Profile, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + p, ok := s.m[id] + if !ok { + return Profile{}, errNotFound + } + return p, nil +} + +func (s *inmemService) PutProfile(ctx context.Context, id string, p Profile) error { + if id != p.ID { + return errInconsistentIDs + } + s.mtx.Lock() + defer s.mtx.Unlock() + s.m[id] = p // PUT = create or update + return nil +} + +func (s *inmemService) PatchProfile(ctx context.Context, id string, p Profile) error { + if p.ID != "" && id != p.ID { + return errInconsistentIDs + } + + s.mtx.Lock() + defer s.mtx.Unlock() + + existing, ok := s.m[id] + if !ok { + return errNotFound // PATCH = update existing, don't create + } + + // We assume that it's not possible to PATCH the ID, and that it's not + // possible to PATCH any field to its zero value. That is, the zero value + // means not specified. The way around this is to use e.g. Name *string in + // the Profile definition. But since this is just a demonstrative example, + // I'm leaving that out. + + if p.Name != "" { + existing.Name = p.Name + } + if len(p.Addresses) > 0 { + existing.Addresses = p.Addresses + } + s.m[id] = existing + return nil +} + +func (s *inmemService) DeleteProfile(ctx context.Context, id string) error { + s.mtx.Lock() + defer s.mtx.Unlock() + if _, ok := s.m[id]; !ok { + return errNotFound + } + delete(s.m, id) + return nil +} + +func (s *inmemService) GetAddresses(ctx context.Context, profileID string) ([]Address, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + p, ok := s.m[profileID] + if !ok { + return []Address{}, errNotFound + } + return p.Addresses, nil +} + +func (s *inmemService) GetAddress(ctx context.Context, profileID string, addressID string) (Address, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + p, ok := s.m[profileID] + if !ok { + return Address{}, errNotFound + } + for _, address := range p.Addresses { + if address.ID == addressID { + return address, nil + } + } + return Address{}, errNotFound +} + +func (s *inmemService) PostAddress(ctx context.Context, profileID string, a Address) error { + s.mtx.Lock() + defer s.mtx.Unlock() + p, ok := s.m[profileID] + if !ok { + return errNotFound + } + for _, address := range p.Addresses { + if address.ID == a.ID { + return errAlreadyExists + } + } + p.Addresses = append(p.Addresses, a) + s.m[profileID] = p + return nil +} + +func (s *inmemService) DeleteAddress(ctx context.Context, profileID string, addressID string) error { + s.mtx.Lock() + defer s.mtx.Unlock() + p, ok := s.m[profileID] + if !ok { + return errNotFound + } + newAddresses := make([]Address, 0, len(p.Addresses)) + for _, address := range p.Addresses { + if address.ID == addressID { + continue // delete + } + newAddresses = append(newAddresses, address) + } + if len(newAddresses) == len(p.Addresses) { + return errNotFound + } + p.Addresses = newAddresses + s.m[profileID] = p + return nil +} diff --git a/examples/profilesvc/transport.go b/examples/profilesvc/transport.go index 6bae7a9..c1951af 100644 --- a/examples/profilesvc/transport.go +++ b/examples/profilesvc/transport.go @@ -219,7 +219,7 @@ // errorer is implemented by all concrete response types. It allows us to // change the HTTP response code without needing to trigger an endpoint // (transport-level) error. For more information, read the big comment in -// endpoint.go. +// endpoints.go. type errorer interface { error() error } @@ -239,6 +239,9 @@ } func encodeError(w stdhttp.ResponseWriter, err error) { + if err == nil { + panic("encodeError with nil error") + } w.WriteHeader(codeFrom(err)) json.NewEncoder(w).Encode(map[string]interface{}{ "error": err.Error(), @@ -247,8 +250,6 @@ func codeFrom(err error) int { switch err { - case nil: - return stdhttp.StatusOK case errNotFound: return stdhttp.StatusNotFound case errAlreadyExists, errInconsistentIDs: diff --git a/examples/shipping/README.md b/examples/shipping/README.md new file mode 100644 index 0000000..cbcc4df --- /dev/null +++ b/examples/shipping/README.md @@ -0,0 +1,25 @@ +# shipping + +This example demonstrates a more real-world application consisting of multiple services. + +## Description + +The implementation is based on the container shipping domain from the [Domain Driven Design](http://www.amazon.com/Domain-Driven-Design-Tackling-Complexity-Software/dp/0321125215) book by Eric Evans, which was [originally](http://dddsample.sourceforge.net/) implemented in Java but has since been ported to Go. This example is a somewhat stripped down version to demonstrate the use of Go kit. The [original Go application](https://github.com/marcusolsson/goddd) is maintained separately and accompanied by an [AngularJS application](https://github.com/marcusolsson/dddelivery-angularjs) as well as a mock [routing service](https://github.com/marcusolsson/pathfinder). + +### Organization + +The application consists of three application services, `booking`, `handling` and `tracking`. Each of these is an individual Go kit service as seen in previous examples. + +- __booking__ - used by the shipping company to book and route cargos. +- __handling__ - used by our staff around the world to register whenever the cargo has been received, loaded etc. +- __tracking__ - used by the customer to track the cargo along the route + +There are also a few pure domain packages that contain some intricate business-logic. They provide domain objects and services that are used by each application service to provide interesting use-cases for the user. + +`repository` contains in-memory implementations for the repositories found in the domain packages. + +The `routing` package provides a _domain service_ that is used to query an external application for possible routes. + +## Contributing + +As with all Go kit examples you are more than welcome to contribute. If you do however, please consider contributing back to the original project as well. diff --git a/examples/shipping/booking/endpoint.go b/examples/shipping/booking/endpoint.go new file mode 100644 index 0000000..b9864d2 --- /dev/null +++ b/examples/shipping/booking/endpoint.go @@ -0,0 +1,139 @@ +package booking + +import ( + "time" + + "golang.org/x/net/context" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" +) + +type bookCargoRequest struct { + Origin location.UNLocode + Destination location.UNLocode + ArrivalDeadline time.Time +} + +type bookCargoResponse struct { + ID cargo.TrackingID `json:"tracking_id,omitempty"` + Err error `json:"error,omitempty"` +} + +func (r bookCargoResponse) error() error { return r.Err } + +func makeBookCargoEndpoint(s Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(bookCargoRequest) + id, err := s.BookNewCargo(req.Origin, req.Destination, req.ArrivalDeadline) + return bookCargoResponse{ID: id, Err: err}, nil + } +} + +type loadCargoRequest struct { + ID cargo.TrackingID +} + +type loadCargoResponse struct { + Cargo *Cargo `json:"cargo,omitempty"` + Err error `json:"error,omitempty"` +} + +func (r loadCargoResponse) error() error { return r.Err } + +func makeLoadCargoEndpoint(bs Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(loadCargoRequest) + c, err := bs.LoadCargo(req.ID) + return loadCargoResponse{Cargo: &c, Err: err}, nil + } +} + +type requestRoutesRequest struct { + ID cargo.TrackingID +} + +type requestRoutesResponse struct { + Routes []cargo.Itinerary `json:"routes,omitempty"` + Err error `json:"error,omitempty"` +} + +func (r requestRoutesResponse) error() error { return r.Err } + +func makeRequestRoutesEndpoint(s Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(requestRoutesRequest) + itin := s.RequestPossibleRoutesForCargo(req.ID) + return requestRoutesResponse{Routes: itin, Err: nil}, nil + } +} + +type assignToRouteRequest struct { + ID cargo.TrackingID + Itinerary cargo.Itinerary +} + +type assignToRouteResponse struct { + Err error `json:"error,omitempty"` +} + +func (r assignToRouteResponse) error() error { return r.Err } + +func makeAssignToRouteEndpoint(s Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(assignToRouteRequest) + err := s.AssignCargoToRoute(req.ID, req.Itinerary) + return assignToRouteResponse{Err: err}, nil + } +} + +type changeDestinationRequest struct { + ID cargo.TrackingID + Destination location.UNLocode +} + +type changeDestinationResponse struct { + Err error `json:"error,omitempty"` +} + +func (r changeDestinationResponse) error() error { return r.Err } + +func makeChangeDestinationEndpoint(s Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(changeDestinationRequest) + err := s.ChangeDestination(req.ID, req.Destination) + return changeDestinationResponse{Err: err}, nil + } +} + +type listCargosRequest struct{} + +type listCargosResponse struct { + Cargos []Cargo `json:"cargos,omitempty"` + Err error `json:"error,omitempty"` +} + +func (r listCargosResponse) error() error { return r.Err } + +func makeListCargosEndpoint(s Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + _ = request.(listCargosRequest) + return listCargosResponse{Cargos: s.Cargos(), Err: nil}, nil + } +} + +type listLocationsRequest struct { +} + +type listLocationsResponse struct { + Locations []Location `json:"locations,omitempty"` + Err error `json:"error,omitempty"` +} + +func makeListLocationsEndpoint(s Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + _ = request.(listLocationsRequest) + return listLocationsResponse{Locations: s.Locations(), Err: nil}, nil + } +} diff --git a/examples/shipping/booking/logging.go b/examples/shipping/booking/logging.go new file mode 100644 index 0000000..3a04576 --- /dev/null +++ b/examples/shipping/booking/logging.go @@ -0,0 +1,101 @@ +package booking + +import ( + "time" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/log" +) + +type loggingService struct { + logger log.Logger + Service +} + +// NewLoggingService returns a new instance of a logging Service. +func NewLoggingService(logger log.Logger, s Service) Service { + return &loggingService{logger, s} +} + +func (s *loggingService) BookNewCargo(origin location.UNLocode, destination location.UNLocode, arrivalDeadline time.Time) (id cargo.TrackingID, err error) { + defer func(begin time.Time) { + s.logger.Log( + "method", "book", + "origin", origin, + "destination", destination, + "arrival_deadline", arrivalDeadline, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return s.Service.BookNewCargo(origin, destination, arrivalDeadline) +} + +func (s *loggingService) LoadCargo(id cargo.TrackingID) (c Cargo, err error) { + defer func(begin time.Time) { + s.logger.Log( + "method", "load", + "tracking_id", id, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return s.Service.LoadCargo(id) +} + +func (s *loggingService) RequestPossibleRoutesForCargo(id cargo.TrackingID) []cargo.Itinerary { + defer func(begin time.Time) { + s.logger.Log( + "method", "request_routes", + "tracking_id", id, + "took", time.Since(begin), + ) + }(time.Now()) + return s.Service.RequestPossibleRoutesForCargo(id) +} + +func (s *loggingService) AssignCargoToRoute(id cargo.TrackingID, itinerary cargo.Itinerary) (err error) { + defer func(begin time.Time) { + s.logger.Log( + "method", "assign_to_route", + "tracking_id", id, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return s.Service.AssignCargoToRoute(id, itinerary) +} + +func (s *loggingService) ChangeDestination(id cargo.TrackingID, l location.UNLocode) (err error) { + defer func(begin time.Time) { + s.logger.Log( + "method", "change_destination", + "tracking_id", id, + "destination", l, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return s.Service.ChangeDestination(id, l) +} + +func (s *loggingService) Cargos() []Cargo { + defer func(begin time.Time) { + s.logger.Log( + "method", "list_cargos", + "took", time.Since(begin), + ) + }(time.Now()) + return s.Service.Cargos() +} + +func (s *loggingService) Locations() []Location { + defer func(begin time.Time) { + s.logger.Log( + "method", "list_locations", + "took", time.Since(begin), + ) + }(time.Now()) + return s.Service.Locations() +} diff --git a/examples/shipping/booking/service.go b/examples/shipping/booking/service.go new file mode 100644 index 0000000..47605f8 --- /dev/null +++ b/examples/shipping/booking/service.go @@ -0,0 +1,201 @@ +// Package booking provides the use-case of booking a cargo. Used by views +// facing an administrator. +package booking + +import ( + "errors" + "time" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/routing" +) + +// ErrInvalidArgument is returned when one or more arguments are invalid. +var ErrInvalidArgument = errors.New("invalid argument") + +// Service is the interface that provides booking methods. +type Service interface { + // BookNewCargo registers a new cargo in the tracking system, not yet + // routed. + BookNewCargo(origin location.UNLocode, destination location.UNLocode, arrivalDeadline time.Time) (cargo.TrackingID, error) + + // LoadCargo returns a read model of a cargo. + LoadCargo(trackingID cargo.TrackingID) (Cargo, error) + + // RequestPossibleRoutesForCargo requests a list of itineraries describing + // possible routes for this cargo. + RequestPossibleRoutesForCargo(trackingID cargo.TrackingID) []cargo.Itinerary + + // AssignCargoToRoute assigns a cargo to the route specified by the + // itinerary. + AssignCargoToRoute(trackingID cargo.TrackingID, itinerary cargo.Itinerary) error + + // ChangeDestination changes the destination of a cargo. + ChangeDestination(trackingID cargo.TrackingID, unLocode location.UNLocode) error + + // Cargos returns a list of all cargos that have been booked. + Cargos() []Cargo + + // Locations returns a list of registered locations. + Locations() []Location +} + +type service struct { + cargoRepository cargo.Repository + locationRepository location.Repository + routingService routing.Service + handlingEventRepository cargo.HandlingEventRepository +} + +func (s *service) AssignCargoToRoute(id cargo.TrackingID, itinerary cargo.Itinerary) error { + if id == "" || len(itinerary.Legs) == 0 { + return ErrInvalidArgument + } + + c, err := s.cargoRepository.Find(id) + if err != nil { + return err + } + + c.AssignToRoute(itinerary) + + if err := s.cargoRepository.Store(c); err != nil { + return err + } + + return nil +} + +func (s *service) BookNewCargo(origin, destination location.UNLocode, arrivalDeadline time.Time) (cargo.TrackingID, error) { + if origin == "" || destination == "" || arrivalDeadline.IsZero() { + return "", ErrInvalidArgument + } + + id := cargo.NextTrackingID() + rs := cargo.RouteSpecification{ + Origin: origin, + Destination: destination, + ArrivalDeadline: arrivalDeadline, + } + + c := cargo.New(id, rs) + + if err := s.cargoRepository.Store(c); err != nil { + return "", err + } + + return c.TrackingID, nil +} + +func (s *service) LoadCargo(trackingID cargo.TrackingID) (Cargo, error) { + if trackingID == "" { + return Cargo{}, ErrInvalidArgument + } + + c, err := s.cargoRepository.Find(trackingID) + if err != nil { + return Cargo{}, err + } + + return assemble(c, s.handlingEventRepository), nil +} + +func (s *service) ChangeDestination(id cargo.TrackingID, destination location.UNLocode) error { + if id == "" || destination == "" { + return ErrInvalidArgument + } + + c, err := s.cargoRepository.Find(id) + if err != nil { + return err + } + + l, err := s.locationRepository.Find(destination) + if err != nil { + return err + } + + c.SpecifyNewRoute(cargo.RouteSpecification{ + Origin: c.Origin, + Destination: l.UNLocode, + ArrivalDeadline: c.RouteSpecification.ArrivalDeadline, + }) + + if err := s.cargoRepository.Store(c); err != nil { + return err + } + + return nil +} + +func (s *service) RequestPossibleRoutesForCargo(id cargo.TrackingID) []cargo.Itinerary { + if id == "" { + return nil + } + + c, err := s.cargoRepository.Find(id) + if err != nil { + return []cargo.Itinerary{} + } + + return s.routingService.FetchRoutesForSpecification(c.RouteSpecification) +} + +func (s *service) Cargos() []Cargo { + var result []Cargo + for _, c := range s.cargoRepository.FindAll() { + result = append(result, assemble(c, s.handlingEventRepository)) + } + return result +} + +func (s *service) Locations() []Location { + var result []Location + for _, v := range s.locationRepository.FindAll() { + result = append(result, Location{ + UNLocode: string(v.UNLocode), + Name: v.Name, + }) + } + return result +} + +// NewService creates a booking service with necessary dependencies. +func NewService(cr cargo.Repository, lr location.Repository, her cargo.HandlingEventRepository, rs routing.Service) Service { + return &service{ + cargoRepository: cr, + locationRepository: lr, + handlingEventRepository: her, + routingService: rs, + } +} + +// Location is a read model for booking views. +type Location struct { + UNLocode string `json:"locode"` + Name string `json:"name"` +} + +// Cargo is a read model for booking views. +type Cargo struct { + ArrivalDeadline time.Time `json:"arrival_deadline"` + Destination string `json:"destination"` + Legs []cargo.Leg `json:"legs,omitempty"` + Misrouted bool `json:"misrouted"` + Origin string `json:"origin"` + Routed bool `json:"routed"` + TrackingID string `json:"tracking_id"` +} + +func assemble(c *cargo.Cargo, her cargo.HandlingEventRepository) Cargo { + return Cargo{ + TrackingID: string(c.TrackingID), + Origin: string(c.Origin), + Destination: string(c.RouteSpecification.Destination), + Misrouted: c.Delivery.RoutingStatus == cargo.Misrouted, + Routed: !c.Itinerary.IsEmpty(), + ArrivalDeadline: c.RouteSpecification.ArrivalDeadline, + Legs: c.Itinerary.Legs, + } +} diff --git a/examples/shipping/booking/transport.go b/examples/shipping/booking/transport.go new file mode 100644 index 0000000..9999e17 --- /dev/null +++ b/examples/shipping/booking/transport.go @@ -0,0 +1,201 @@ +package booking + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/gorilla/mux" + "golang.org/x/net/context" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + kitlog "github.com/go-kit/kit/log" + kithttp "github.com/go-kit/kit/transport/http" +) + +// MakeHandler returns a handler for the booking service. +func MakeHandler(ctx context.Context, bs Service, logger kitlog.Logger) http.Handler { + opts := []kithttp.ServerOption{ + kithttp.ServerErrorLogger(logger), + kithttp.ServerErrorEncoder(encodeError), + } + + bookCargoHandler := kithttp.NewServer( + ctx, + makeBookCargoEndpoint(bs), + decodeBookCargoRequest, + encodeResponse, + opts..., + ) + loadCargoHandler := kithttp.NewServer( + ctx, + makeLoadCargoEndpoint(bs), + decodeLoadCargoRequest, + encodeResponse, + opts..., + ) + requestRoutesHandler := kithttp.NewServer( + ctx, + makeRequestRoutesEndpoint(bs), + decodeRequestRoutesRequest, + encodeResponse, + opts..., + ) + assignToRouteHandler := kithttp.NewServer( + ctx, + makeAssignToRouteEndpoint(bs), + decodeAssignToRouteRequest, + encodeResponse, + opts..., + ) + changeDestinationHandler := kithttp.NewServer( + ctx, + makeChangeDestinationEndpoint(bs), + decodeChangeDestinationRequest, + encodeResponse, + opts..., + ) + listCargosHandler := kithttp.NewServer( + ctx, + makeListCargosEndpoint(bs), + decodeListCargosRequest, + encodeResponse, + opts..., + ) + listLocationsHandler := kithttp.NewServer( + ctx, + makeListLocationsEndpoint(bs), + decodeListLocationsRequest, + encodeResponse, + opts..., + ) + + r := mux.NewRouter() + + r.Handle("/booking/v1/cargos", bookCargoHandler).Methods("POST") + r.Handle("/booking/v1/cargos", listCargosHandler).Methods("GET") + r.Handle("/booking/v1/cargos/{id}", loadCargoHandler).Methods("GET") + r.Handle("/booking/v1/cargos/{id}/request_routes", requestRoutesHandler).Methods("GET") + r.Handle("/booking/v1/cargos/{id}/assign_to_route", assignToRouteHandler).Methods("POST") + r.Handle("/booking/v1/cargos/{id}/change_destination", changeDestinationHandler).Methods("POST") + r.Handle("/booking/v1/locations", listLocationsHandler).Methods("GET") + r.Handle("/booking/v1/docs", http.StripPrefix("/booking/v1/docs", http.FileServer(http.Dir("booking/docs")))) + + return r +} + +var errBadRoute = errors.New("bad route") + +func decodeBookCargoRequest(r *http.Request) (interface{}, error) { + var body struct { + Origin string `json:"origin"` + Destination string `json:"destination"` + ArrivalDeadline time.Time `json:"arrival_deadline"` + } + + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + return nil, err + } + + return bookCargoRequest{ + Origin: location.UNLocode(body.Origin), + Destination: location.UNLocode(body.Destination), + ArrivalDeadline: body.ArrivalDeadline, + }, nil +} + +func decodeLoadCargoRequest(r *http.Request) (interface{}, error) { + vars := mux.Vars(r) + id, ok := vars["id"] + if !ok { + return nil, errBadRoute + } + return loadCargoRequest{ID: cargo.TrackingID(id)}, nil +} + +func decodeRequestRoutesRequest(r *http.Request) (interface{}, error) { + vars := mux.Vars(r) + id, ok := vars["id"] + if !ok { + return nil, errBadRoute + } + return requestRoutesRequest{ID: cargo.TrackingID(id)}, nil +} + +func decodeAssignToRouteRequest(r *http.Request) (interface{}, error) { + vars := mux.Vars(r) + id, ok := vars["id"] + if !ok { + return nil, errBadRoute + } + + var itinerary cargo.Itinerary + if err := json.NewDecoder(r.Body).Decode(&itinerary); err != nil { + return nil, err + } + + return assignToRouteRequest{ + ID: cargo.TrackingID(id), + Itinerary: itinerary, + }, nil +} + +func decodeChangeDestinationRequest(r *http.Request) (interface{}, error) { + vars := mux.Vars(r) + id, ok := vars["id"] + if !ok { + return nil, errBadRoute + } + + var body struct { + Destination string `json:"destination"` + } + + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + return nil, err + } + + return changeDestinationRequest{ + ID: cargo.TrackingID(id), + Destination: location.UNLocode(body.Destination), + }, nil +} + +func decodeListCargosRequest(r *http.Request) (interface{}, error) { + return listCargosRequest{}, nil +} + +func decodeListLocationsRequest(r *http.Request) (interface{}, error) { + return listLocationsRequest{}, nil +} + +func encodeResponse(w http.ResponseWriter, response interface{}) error { + if e, ok := response.(errorer); ok && e.error() != nil { + encodeError(w, e.error()) + return nil + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + return json.NewEncoder(w).Encode(response) +} + +type errorer interface { + error() error +} + +// encode errors from business-logic +func encodeError(w http.ResponseWriter, err error) { + switch err { + case cargo.ErrUnknown: + w.WriteHeader(http.StatusNotFound) + case ErrInvalidArgument: + w.WriteHeader(http.StatusBadRequest) + default: + w.WriteHeader(http.StatusInternalServerError) + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + json.NewEncoder(w).Encode(map[string]interface{}{ + "error": err.Error(), + }) +} diff --git a/examples/shipping/cargo/cargo.go b/examples/shipping/cargo/cargo.go new file mode 100644 index 0000000..d4bb5f4 --- /dev/null +++ b/examples/shipping/cargo/cargo.go @@ -0,0 +1,137 @@ +// Package cargo contains the heart of the domain model. +package cargo + +import ( + "errors" + "strings" + "time" + + "github.com/pborman/uuid" + + "github.com/go-kit/kit/examples/shipping/location" +) + +// TrackingID uniquely identifies a particular cargo. +type TrackingID string + +// Cargo is the central class in the domain model. +type Cargo struct { + TrackingID TrackingID + Origin location.UNLocode + RouteSpecification RouteSpecification + Itinerary Itinerary + Delivery Delivery +} + +// SpecifyNewRoute specifies a new route for this cargo. +func (c *Cargo) SpecifyNewRoute(rs RouteSpecification) { + c.RouteSpecification = rs + c.Delivery = c.Delivery.UpdateOnRouting(c.RouteSpecification, c.Itinerary) +} + +// AssignToRoute attaches a new itinerary to this cargo. +func (c *Cargo) AssignToRoute(itinerary Itinerary) { + c.Itinerary = itinerary + c.Delivery = c.Delivery.UpdateOnRouting(c.RouteSpecification, c.Itinerary) +} + +// DeriveDeliveryProgress updates all aspects of the cargo aggregate status +// based on the current route specification, itinerary and handling of the cargo. +func (c *Cargo) DeriveDeliveryProgress(history HandlingHistory) { + c.Delivery = DeriveDeliveryFrom(c.RouteSpecification, c.Itinerary, history) +} + +// New creates a new, unrouted cargo. +func New(id TrackingID, rs RouteSpecification) *Cargo { + itinerary := Itinerary{} + history := HandlingHistory{make([]HandlingEvent, 0)} + + return &Cargo{ + TrackingID: id, + Origin: rs.Origin, + RouteSpecification: rs, + Delivery: DeriveDeliveryFrom(rs, itinerary, history), + } +} + +// Repository provides access a cargo store. +type Repository interface { + Store(cargo *Cargo) error + Find(trackingID TrackingID) (*Cargo, error) + FindAll() []*Cargo +} + +// ErrUnknown is used when a cargo could not be found. +var ErrUnknown = errors.New("unknown cargo") + +// NextTrackingID generates a new tracking ID. +// TODO: Move to infrastructure(?) +func NextTrackingID() TrackingID { + return TrackingID(strings.Split(strings.ToUpper(uuid.New()), "-")[0]) +} + +// RouteSpecification Contains information about a route: its origin, +// destination and arrival deadline. +type RouteSpecification struct { + Origin location.UNLocode + Destination location.UNLocode + ArrivalDeadline time.Time +} + +// IsSatisfiedBy checks whether provided itinerary satisfies this +// specification. +func (s RouteSpecification) IsSatisfiedBy(itinerary Itinerary) bool { + return itinerary.Legs != nil && + s.Origin == itinerary.InitialDepartureLocation() && + s.Destination == itinerary.FinalArrivalLocation() +} + +// RoutingStatus describes status of cargo routing. +type RoutingStatus int + +// Valid routing statuses. +const ( + NotRouted RoutingStatus = iota + Misrouted + Routed +) + +func (s RoutingStatus) String() string { + switch s { + case NotRouted: + return "Not routed" + case Misrouted: + return "Misrouted" + case Routed: + return "Routed" + } + return "" +} + +// TransportStatus describes status of cargo transportation. +type TransportStatus int + +// Valid transport statuses. +const ( + NotReceived TransportStatus = iota + InPort + OnboardCarrier + Claimed + Unknown +) + +func (s TransportStatus) String() string { + switch s { + case NotReceived: + return "Not received" + case InPort: + return "In port" + case OnboardCarrier: + return "Onboard carrier" + case Claimed: + return "Claimed" + case Unknown: + return "Unknown" + } + return "" +} diff --git a/examples/shipping/cargo/delivery.go b/examples/shipping/cargo/delivery.go new file mode 100644 index 0000000..34f079d --- /dev/null +++ b/examples/shipping/cargo/delivery.go @@ -0,0 +1,174 @@ +package cargo + +import ( + "time" + + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" +) + +// Delivery is the actual transportation of the cargo, as opposed to the +// customer requirement (RouteSpecification) and the plan (Itinerary). +type Delivery struct { + Itinerary Itinerary + RouteSpecification RouteSpecification + RoutingStatus RoutingStatus + TransportStatus TransportStatus + NextExpectedActivity HandlingActivity + LastEvent HandlingEvent + LastKnownLocation location.UNLocode + CurrentVoyage voyage.Number + ETA time.Time + IsMisdirected bool + IsUnloadedAtDestination bool +} + +// UpdateOnRouting creates a new delivery snapshot to reflect changes in +// routing, i.e. when the route specification or the itinerary has changed but +// no additional handling of the cargo has been performed. +func (d Delivery) UpdateOnRouting(rs RouteSpecification, itinerary Itinerary) Delivery { + return newDelivery(d.LastEvent, itinerary, rs) +} + +// IsOnTrack checks if the delivery is on track. +func (d Delivery) IsOnTrack() bool { + return d.RoutingStatus == Routed && !d.IsMisdirected +} + +// DeriveDeliveryFrom creates a new delivery snapshot based on the complete +// handling history of a cargo, as well as its route specification and +// itinerary. +func DeriveDeliveryFrom(rs RouteSpecification, itinerary Itinerary, history HandlingHistory) Delivery { + lastEvent, _ := history.MostRecentlyCompletedEvent() + return newDelivery(lastEvent, itinerary, rs) +} + +// newDelivery creates a up-to-date delivery based on an handling event, +// itinerary and a route specification. +func newDelivery(lastEvent HandlingEvent, itinerary Itinerary, rs RouteSpecification) Delivery { + var ( + routingStatus = calculateRoutingStatus(itinerary, rs) + transportStatus = calculateTransportStatus(lastEvent) + lastKnownLocation = calculateLastKnownLocation(lastEvent) + isMisdirected = calculateMisdirectedStatus(lastEvent, itinerary) + isUnloadedAtDestination = calculateUnloadedAtDestination(lastEvent, rs) + currentVoyage = calculateCurrentVoyage(transportStatus, lastEvent) + ) + + d := Delivery{ + LastEvent: lastEvent, + Itinerary: itinerary, + RouteSpecification: rs, + RoutingStatus: routingStatus, + TransportStatus: transportStatus, + LastKnownLocation: lastKnownLocation, + IsMisdirected: isMisdirected, + IsUnloadedAtDestination: isUnloadedAtDestination, + CurrentVoyage: currentVoyage, + } + + d.NextExpectedActivity = calculateNextExpectedActivity(d) + d.ETA = calculateETA(d) + + return d +} + +// Below are internal functions used when creating a new delivery. + +func calculateRoutingStatus(itinerary Itinerary, rs RouteSpecification) RoutingStatus { + if itinerary.Legs == nil { + return NotRouted + } + + if rs.IsSatisfiedBy(itinerary) { + return Routed + } + + return Misrouted +} + +func calculateMisdirectedStatus(event HandlingEvent, itinerary Itinerary) bool { + if event.Activity.Type == NotHandled { + return false + } + + return !itinerary.IsExpected(event) +} + +func calculateUnloadedAtDestination(event HandlingEvent, rs RouteSpecification) bool { + if event.Activity.Type == NotHandled { + return false + } + + return event.Activity.Type == Unload && rs.Destination == event.Activity.Location +} + +func calculateTransportStatus(event HandlingEvent) TransportStatus { + switch event.Activity.Type { + case NotHandled: + return NotReceived + case Load: + return OnboardCarrier + case Unload: + return InPort + case Receive: + return InPort + case Customs: + return InPort + case Claim: + return Claimed + } + return Unknown +} + +func calculateLastKnownLocation(event HandlingEvent) location.UNLocode { + return event.Activity.Location +} + +func calculateNextExpectedActivity(d Delivery) HandlingActivity { + if !d.IsOnTrack() { + return HandlingActivity{} + } + + switch d.LastEvent.Activity.Type { + case NotHandled: + return HandlingActivity{Type: Receive, Location: d.RouteSpecification.Origin} + case Receive: + l := d.Itinerary.Legs[0] + return HandlingActivity{Type: Load, Location: l.LoadLocation, VoyageNumber: l.VoyageNumber} + case Load: + for _, l := range d.Itinerary.Legs { + if l.LoadLocation == d.LastEvent.Activity.Location { + return HandlingActivity{Type: Unload, Location: l.UnloadLocation, VoyageNumber: l.VoyageNumber} + } + } + case Unload: + for i, l := range d.Itinerary.Legs { + if l.UnloadLocation == d.LastEvent.Activity.Location { + if i < len(d.Itinerary.Legs)-1 { + return HandlingActivity{Type: Load, Location: d.Itinerary.Legs[i+1].LoadLocation, VoyageNumber: d.Itinerary.Legs[i+1].VoyageNumber} + } + + return HandlingActivity{Type: Claim, Location: l.UnloadLocation} + } + } + } + + return HandlingActivity{} +} + +func calculateCurrentVoyage(transportStatus TransportStatus, event HandlingEvent) voyage.Number { + if transportStatus == OnboardCarrier && event.Activity.Type != NotHandled { + return event.Activity.VoyageNumber + } + + return voyage.Number("") +} + +func calculateETA(d Delivery) time.Time { + if !d.IsOnTrack() { + return time.Time{} + } + + return d.Itinerary.FinalArrivalTime() +} diff --git a/examples/shipping/cargo/handling.go b/examples/shipping/cargo/handling.go new file mode 100644 index 0000000..5f77bc4 --- /dev/null +++ b/examples/shipping/cargo/handling.go @@ -0,0 +1,121 @@ +package cargo + +// TODO: It would make sense to have this in its own package. Unfortunately, +// then there would be a circular dependency between the cargo and handling +// packages since cargo.Delivery would use handling.HandlingEvent and +// handling.HandlingEvent would use cargo.TrackingID. Also, +// HandlingEventFactory depends on the cargo repository. +// +// It would make sense not having the cargo package depend on handling. + +import ( + "errors" + "time" + + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" +) + +// HandlingActivity represents how and where a cargo can be handled, and can +// be used to express predictions about what is expected to happen to a cargo +// in the future. +type HandlingActivity struct { + Type HandlingEventType + Location location.UNLocode + VoyageNumber voyage.Number +} + +// HandlingEvent is used to register the event when, for instance, a cargo is +// unloaded from a carrier at a some location at a given time. +type HandlingEvent struct { + TrackingID TrackingID + Activity HandlingActivity +} + +// HandlingEventType describes type of a handling event. +type HandlingEventType int + +// Valid handling event types. +const ( + NotHandled HandlingEventType = iota + Load + Unload + Receive + Claim + Customs +) + +func (t HandlingEventType) String() string { + switch t { + case NotHandled: + return "Not Handled" + case Load: + return "Load" + case Unload: + return "Unload" + case Receive: + return "Receive" + case Claim: + return "Claim" + case Customs: + return "Customs" + } + + return "" +} + +// HandlingHistory is the handling history of a cargo. +type HandlingHistory struct { + HandlingEvents []HandlingEvent +} + +// MostRecentlyCompletedEvent returns most recently completed handling event. +func (h HandlingHistory) MostRecentlyCompletedEvent() (HandlingEvent, error) { + if len(h.HandlingEvents) == 0 { + return HandlingEvent{}, errors.New("delivery history is empty") + } + + return h.HandlingEvents[len(h.HandlingEvents)-1], nil +} + +// HandlingEventRepository provides access a handling event store. +type HandlingEventRepository interface { + Store(e HandlingEvent) + QueryHandlingHistory(TrackingID) HandlingHistory +} + +// HandlingEventFactory creates handling events. +type HandlingEventFactory struct { + CargoRepository Repository + VoyageRepository voyage.Repository + LocationRepository location.Repository +} + +// CreateHandlingEvent creates a validated handling event. +func (f *HandlingEventFactory) CreateHandlingEvent(registrationTime time.Time, completionTime time.Time, trackingID TrackingID, + voyageNumber voyage.Number, unLocode location.UNLocode, eventType HandlingEventType) (HandlingEvent, error) { + + if _, err := f.CargoRepository.Find(trackingID); err != nil { + return HandlingEvent{}, err + } + + if _, err := f.VoyageRepository.Find(voyageNumber); err != nil { + // TODO: This is pretty ugly, but when creating a Receive event, the voyage number is not known. + if len(voyageNumber) > 0 { + return HandlingEvent{}, err + } + } + + if _, err := f.LocationRepository.Find(unLocode); err != nil { + return HandlingEvent{}, err + } + + return HandlingEvent{ + TrackingID: trackingID, + Activity: HandlingActivity{ + Type: eventType, + Location: unLocode, + VoyageNumber: voyageNumber, + }, + }, nil +} diff --git a/examples/shipping/cargo/itinerary.go b/examples/shipping/cargo/itinerary.go new file mode 100644 index 0000000..6b5088e --- /dev/null +++ b/examples/shipping/cargo/itinerary.go @@ -0,0 +1,91 @@ +package cargo + +import ( + "time" + + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" +) + +// Leg describes the transportation between two locations on a voyage. +type Leg struct { + VoyageNumber voyage.Number `json:"voyage_number"` + LoadLocation location.UNLocode `json:"from"` + UnloadLocation location.UNLocode `json:"to"` + LoadTime time.Time `json:"load_time"` + UnloadTime time.Time `json:"unload_time"` +} + +// NewLeg creates a new itinerary leg. +func NewLeg(voyageNumber voyage.Number, loadLocation, unloadLocation location.UNLocode, loadTime, unloadTime time.Time) Leg { + return Leg{ + VoyageNumber: voyageNumber, + LoadLocation: loadLocation, + UnloadLocation: unloadLocation, + LoadTime: loadTime, + UnloadTime: unloadTime, + } +} + +// Itinerary specifies steps required to transport a cargo from its origin to +// destination. +type Itinerary struct { + Legs []Leg `json:"legs"` +} + +// InitialDepartureLocation returns the start of the itinerary. +func (i Itinerary) InitialDepartureLocation() location.UNLocode { + if i.IsEmpty() { + return location.UNLocode("") + } + return i.Legs[0].LoadLocation +} + +// FinalArrivalLocation returns the end of the itinerary. +func (i Itinerary) FinalArrivalLocation() location.UNLocode { + if i.IsEmpty() { + return location.UNLocode("") + } + return i.Legs[len(i.Legs)-1].UnloadLocation +} + +// FinalArrivalTime returns the expected arrival time at final destination. +func (i Itinerary) FinalArrivalTime() time.Time { + return i.Legs[len(i.Legs)-1].UnloadTime +} + +// IsEmpty checks if the itinerary contains at least one leg. +func (i Itinerary) IsEmpty() bool { + return i.Legs == nil || len(i.Legs) == 0 +} + +// IsExpected checks if the given handling event is expected when executing +// this itinerary. +func (i Itinerary) IsExpected(event HandlingEvent) bool { + if i.IsEmpty() { + return true + } + + switch event.Activity.Type { + case Receive: + return i.InitialDepartureLocation() == event.Activity.Location + case Load: + for _, l := range i.Legs { + if l.LoadLocation == event.Activity.Location && l.VoyageNumber == event.Activity.VoyageNumber { + return true + } + } + return false + case Unload: + for _, l := range i.Legs { + if l.UnloadLocation == event.Activity.Location && l.VoyageNumber == event.Activity.VoyageNumber { + return true + } + } + return false + case Claim: + return i.FinalArrivalLocation() == event.Activity.Location + } + + return true +} diff --git a/examples/shipping/handling/endpoint.go b/examples/shipping/handling/endpoint.go new file mode 100644 index 0000000..e10bdda --- /dev/null +++ b/examples/shipping/handling/endpoint.go @@ -0,0 +1,34 @@ +package handling + +import ( + "time" + + "golang.org/x/net/context" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" +) + +type registerIncidentRequest struct { + ID cargo.TrackingID + Location location.UNLocode + Voyage voyage.Number + EventType cargo.HandlingEventType + CompletionTime time.Time +} + +type registerIncidentResponse struct { + Err error `json:"error,omitempty"` +} + +func (r registerIncidentResponse) error() error { return r.Err } + +func makeRegisterIncidentEndpoint(hs Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(registerIncidentRequest) + err := hs.RegisterHandlingEvent(req.CompletionTime, req.ID, req.Voyage, req.Location, req.EventType) + return registerIncidentResponse{Err: err}, nil + } +} diff --git a/examples/shipping/handling/logging.go b/examples/shipping/handling/logging.go new file mode 100644 index 0000000..26457ac --- /dev/null +++ b/examples/shipping/handling/logging.go @@ -0,0 +1,37 @@ +package handling + +import ( + "time" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" + "github.com/go-kit/kit/log" +) + +type loggingService struct { + logger log.Logger + Service +} + +// NewLoggingService returns a new instance of a logging Service. +func NewLoggingService(logger log.Logger, s Service) Service { + return &loggingService{logger, s} +} + +func (s *loggingService) RegisterHandlingEvent(completionTime time.Time, trackingID cargo.TrackingID, voyageNumber voyage.Number, + unLocode location.UNLocode, eventType cargo.HandlingEventType) (err error) { + defer func(begin time.Time) { + s.logger.Log( + "method", "register_incident", + "tracking_id", trackingID, + "location", unLocode, + "voyage", voyageNumber, + "event_type", eventType, + "completion_time", completionTime, + "took", time.Since(begin), + "err", err, + ) + }(time.Now()) + return s.Service.RegisterHandlingEvent(completionTime, trackingID, voyageNumber, unLocode, eventType) +} diff --git a/examples/shipping/handling/service.go b/examples/shipping/handling/service.go new file mode 100644 index 0000000..f548f4c --- /dev/null +++ b/examples/shipping/handling/service.go @@ -0,0 +1,76 @@ +// Package handling provides the use-case for registering incidents. Used by +// views facing the people handling the cargo along its route. +package handling + +import ( + "errors" + "time" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/inspection" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" +) + +// ErrInvalidArgument is returned when one or more arguments are invalid. +var ErrInvalidArgument = errors.New("invalid argument") + +// EventHandler provides a means of subscribing to registered handling events. +type EventHandler interface { + CargoWasHandled(cargo.HandlingEvent) +} + +// Service provides handling operations. +type Service interface { + // RegisterHandlingEvent registers a handling event in the system, and + // notifies interested parties that a cargo has been handled. + RegisterHandlingEvent(completionTime time.Time, trackingID cargo.TrackingID, voyageNumber voyage.Number, + unLocode location.UNLocode, eventType cargo.HandlingEventType) error +} + +type service struct { + handlingEventRepository cargo.HandlingEventRepository + handlingEventFactory cargo.HandlingEventFactory + handlingEventHandler EventHandler +} + +func (s *service) RegisterHandlingEvent(completionTime time.Time, trackingID cargo.TrackingID, voyage voyage.Number, + loc location.UNLocode, eventType cargo.HandlingEventType) error { + if completionTime.IsZero() || trackingID == "" || loc == "" || eventType == cargo.NotHandled { + return ErrInvalidArgument + } + + e, err := s.handlingEventFactory.CreateHandlingEvent(time.Now(), completionTime, trackingID, voyage, loc, eventType) + if err != nil { + return err + } + + s.handlingEventRepository.Store(e) + s.handlingEventHandler.CargoWasHandled(e) + + return nil +} + +// NewService creates a handling event service with necessary dependencies. +func NewService(r cargo.HandlingEventRepository, f cargo.HandlingEventFactory, h EventHandler) Service { + return &service{ + handlingEventRepository: r, + handlingEventFactory: f, + handlingEventHandler: h, + } +} + +type handlingEventHandler struct { + InspectionService inspection.Service +} + +func (h *handlingEventHandler) CargoWasHandled(event cargo.HandlingEvent) { + h.InspectionService.InspectCargo(event.TrackingID) +} + +// NewEventHandler returns a new instance of a EventHandler. +func NewEventHandler(s inspection.Service) EventHandler { + return &handlingEventHandler{ + InspectionService: s, + } +} diff --git a/examples/shipping/handling/transport.go b/examples/shipping/handling/transport.go new file mode 100644 index 0000000..ff9ece9 --- /dev/null +++ b/examples/shipping/handling/transport.go @@ -0,0 +1,100 @@ +package handling + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/gorilla/mux" + "golang.org/x/net/context" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" + kitlog "github.com/go-kit/kit/log" + kithttp "github.com/go-kit/kit/transport/http" +) + +// MakeHandler returns a handler for the handling service. +func MakeHandler(ctx context.Context, hs Service, logger kitlog.Logger) http.Handler { + r := mux.NewRouter() + + opts := []kithttp.ServerOption{ + kithttp.ServerErrorLogger(logger), + kithttp.ServerErrorEncoder(encodeError), + } + + registerIncidentHandler := kithttp.NewServer( + ctx, + makeRegisterIncidentEndpoint(hs), + decodeRegisterIncidentRequest, + encodeResponse, + opts..., + ) + + r.Handle("/handling/v1/incidents", registerIncidentHandler).Methods("POST") + + return r +} + +func decodeRegisterIncidentRequest(r *http.Request) (interface{}, error) { + var body struct { + CompletionTime time.Time `json:"completion_time"` + TrackingID string `json:"tracking_id"` + VoyageNumber string `json:"voyage"` + Location string `json:"location"` + EventType string `json:"event_type"` + } + + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + return nil, err + } + + return registerIncidentRequest{ + CompletionTime: body.CompletionTime, + ID: cargo.TrackingID(body.TrackingID), + Voyage: voyage.Number(body.VoyageNumber), + Location: location.UNLocode(body.Location), + EventType: stringToEventType(body.EventType), + }, nil +} + +func stringToEventType(s string) cargo.HandlingEventType { + types := map[string]cargo.HandlingEventType{ + cargo.Receive.String(): cargo.Receive, + cargo.Load.String(): cargo.Load, + cargo.Unload.String(): cargo.Unload, + cargo.Customs.String(): cargo.Customs, + cargo.Claim.String(): cargo.Claim, + } + return types[s] +} + +func encodeResponse(w http.ResponseWriter, response interface{}) error { + if e, ok := response.(errorer); ok && e.error() != nil { + encodeError(w, e.error()) + return nil + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + return json.NewEncoder(w).Encode(response) +} + +type errorer interface { + error() error +} + +// encode errors from business-logic +func encodeError(w http.ResponseWriter, err error) { + switch err { + case cargo.ErrUnknown: + w.WriteHeader(http.StatusNotFound) + case ErrInvalidArgument: + w.WriteHeader(http.StatusBadRequest) + default: + w.WriteHeader(http.StatusInternalServerError) + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + json.NewEncoder(w).Encode(map[string]interface{}{ + "error": err.Error(), + }) +} diff --git a/examples/shipping/inspection/inspection.go b/examples/shipping/inspection/inspection.go new file mode 100644 index 0000000..a3f7147 --- /dev/null +++ b/examples/shipping/inspection/inspection.go @@ -0,0 +1,51 @@ +// Package inspection provides means to inspect cargos. +package inspection + +import "github.com/go-kit/kit/examples/shipping/cargo" + +// EventHandler provides means of subscribing to inspection events. +type EventHandler interface { + CargoWasMisdirected(*cargo.Cargo) + CargoHasArrived(*cargo.Cargo) +} + +// Service provides cargo inspection operations. +type Service interface { + // InspectCargo inspects cargo and send relevant notifications to + // interested parties, for example if a cargo has been misdirected, or + // unloaded at the final destination. + InspectCargo(trackingID cargo.TrackingID) +} + +type service struct { + cargoRepository cargo.Repository + handlingEventRepository cargo.HandlingEventRepository + cargoEventHandler EventHandler +} + +// TODO: Should be transactional +func (s *service) InspectCargo(trackingID cargo.TrackingID) { + c, err := s.cargoRepository.Find(trackingID) + if err != nil { + return + } + + h := s.handlingEventRepository.QueryHandlingHistory(trackingID) + + c.DeriveDeliveryProgress(h) + + if c.Delivery.IsMisdirected { + s.cargoEventHandler.CargoWasMisdirected(c) + } + + if c.Delivery.IsUnloadedAtDestination { + s.cargoEventHandler.CargoHasArrived(c) + } + + s.cargoRepository.Store(c) +} + +// NewService creates a inspection service with necessary dependencies. +func NewService(cargoRepository cargo.Repository, handlingEventRepository cargo.HandlingEventRepository, eventHandler EventHandler) Service { + return &service{cargoRepository, handlingEventRepository, eventHandler} +} diff --git a/examples/shipping/location/location.go b/examples/shipping/location/location.go new file mode 100644 index 0000000..5129380 --- /dev/null +++ b/examples/shipping/location/location.go @@ -0,0 +1,27 @@ +// Package location provides the Location aggregate. +package location + +import "errors" + +// UNLocode is the United Nations location code that uniquely identifies a +// particular location. +// +// http://www.unece.org/cefact/locode/ +// http://www.unece.org/cefact/locode/DocColumnDescription.htm#LOCODE +type UNLocode string + +// Location is a location is our model is stops on a journey, such as cargo +// origin or destination, or carrier movement endpoints. +type Location struct { + UNLocode UNLocode + Name string +} + +// ErrUnknown is used when a location could not be found. +var ErrUnknown = errors.New("unknown location") + +// Repository provides access a location store. +type Repository interface { + Find(locode UNLocode) (Location, error) + FindAll() []Location +} diff --git a/examples/shipping/location/sample_locations.go b/examples/shipping/location/sample_locations.go new file mode 100644 index 0000000..de0d4c1 --- /dev/null +++ b/examples/shipping/location/sample_locations.go @@ -0,0 +1,27 @@ +package location + +// Sample UN locodes. +var ( + SESTO UNLocode = "SESTO" + AUMEL UNLocode = "AUMEL" + CNHKG UNLocode = "CNHKG" + USNYC UNLocode = "USNYC" + USCHI UNLocode = "USCHI" + JNTKO UNLocode = "JNTKO" + DEHAM UNLocode = "DEHAM" + NLRTM UNLocode = "NLRTM" + FIHEL UNLocode = "FIHEL" +) + +// Sample locations. +var ( + Stockholm = Location{SESTO, "Stockholm"} + Melbourne = Location{AUMEL, "Melbourne"} + Hongkong = Location{CNHKG, "Hongkong"} + NewYork = Location{USNYC, "New York"} + Chicago = Location{USCHI, "Chicago"} + Tokyo = Location{JNTKO, "Tokyo"} + Hamburg = Location{DEHAM, "Hamburg"} + Rotterdam = Location{NLRTM, "Rotterdam"} + Helsinki = Location{FIHEL, "Helsinki"} +) diff --git a/examples/shipping/main.go b/examples/shipping/main.go new file mode 100644 index 0000000..cd4e624 --- /dev/null +++ b/examples/shipping/main.go @@ -0,0 +1,157 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/go-kit/kit/log" + "golang.org/x/net/context" + + "github.com/go-kit/kit/examples/shipping/booking" + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/handling" + "github.com/go-kit/kit/examples/shipping/inspection" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/repository" + "github.com/go-kit/kit/examples/shipping/routing" + "github.com/go-kit/kit/examples/shipping/tracking" +) + +const ( + defaultPort = "8080" + defaultRoutingServiceURL = "http://localhost:7878" +) + +func main() { + var ( + addr = envString("PORT", defaultPort) + rsurl = envString("ROUTINGSERVICE_URL", defaultRoutingServiceURL) + + httpAddr = flag.String("http.addr", ":"+addr, "HTTP listen address") + routingServiceURL = flag.String("service.routing", rsurl, "routing service URL") + + ctx = context.Background() + ) + + flag.Parse() + + var logger log.Logger + logger = log.NewLogfmtLogger(os.Stderr) + logger = &serializedLogger{Logger: logger} + logger = log.NewContext(logger).With("ts", log.DefaultTimestampUTC) + + var ( + cargos = repository.NewCargo() + locations = repository.NewLocation() + voyages = repository.NewVoyage() + handlingEvents = repository.NewHandlingEvent() + ) + + // Configure some questionable dependencies. + var ( + handlingEventFactory = cargo.HandlingEventFactory{ + CargoRepository: cargos, + VoyageRepository: voyages, + LocationRepository: locations, + } + handlingEventHandler = handling.NewEventHandler( + inspection.NewService(cargos, handlingEvents, nil), + ) + ) + + // Facilitate testing by adding some cargos. + storeTestData(cargos) + + var rs routing.Service + rs = routing.NewProxyingMiddleware(*routingServiceURL, ctx)(rs) + + var bs booking.Service + bs = booking.NewService(cargos, locations, handlingEvents, rs) + bs = booking.NewLoggingService(log.NewContext(logger).With("component", "booking"), bs) + + var ts tracking.Service + ts = tracking.NewService(cargos, handlingEvents) + ts = tracking.NewLoggingService(log.NewContext(logger).With("component", "tracking"), ts) + + var hs handling.Service + hs = handling.NewService(handlingEvents, handlingEventFactory, handlingEventHandler) + hs = handling.NewLoggingService(log.NewContext(logger).With("component", "handling"), hs) + + httpLogger := log.NewContext(logger).With("component", "http") + + mux := http.NewServeMux() + + mux.Handle("/booking/v1/", booking.MakeHandler(ctx, bs, httpLogger)) + mux.Handle("/tracking/v1/", tracking.MakeHandler(ctx, ts, httpLogger)) + mux.Handle("/handling/v1/", handling.MakeHandler(ctx, hs, httpLogger)) + + http.Handle("/", accessControl(mux)) + + errs := make(chan error, 2) + go func() { + logger.Log("transport", "http", "address", *httpAddr, "msg", "listening") + errs <- http.ListenAndServe(*httpAddr, nil) + }() + go func() { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT) + errs <- fmt.Errorf("%s", <-c) + }() + + logger.Log("terminated", <-errs) +} + +func accessControl(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Origin, Content-Type") + + if r.Method == "OPTIONS" { + return + } + + h.ServeHTTP(w, r) + }) +} + +func envString(env, fallback string) string { + e := os.Getenv(env) + if e == "" { + return fallback + } + return e +} + +func storeTestData(r cargo.Repository) { + test1 := cargo.New("FTL456", cargo.RouteSpecification{ + Origin: location.AUMEL, + Destination: location.SESTO, + ArrivalDeadline: time.Now().AddDate(0, 0, 7), + }) + _ = r.Store(test1) + + test2 := cargo.New("ABC123", cargo.RouteSpecification{ + Origin: location.SESTO, + Destination: location.CNHKG, + ArrivalDeadline: time.Now().AddDate(0, 0, 14), + }) + _ = r.Store(test2) +} + +type serializedLogger struct { + mtx sync.Mutex + log.Logger +} + +func (l *serializedLogger) Log(keyvals ...interface{}) error { + l.mtx.Lock() + defer l.mtx.Unlock() + return l.Logger.Log(keyvals...) +} diff --git a/examples/shipping/repository/repositories.go b/examples/shipping/repository/repositories.go new file mode 100644 index 0000000..714d0a8 --- /dev/null +++ b/examples/shipping/repository/repositories.go @@ -0,0 +1,142 @@ +// Package repository provides implementations of all the domain repositories. +package repository + +import ( + "sync" + + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" +) + +type cargoRepository struct { + mtx sync.RWMutex + cargos map[cargo.TrackingID]*cargo.Cargo +} + +func (r *cargoRepository) Store(c *cargo.Cargo) error { + r.mtx.Lock() + defer r.mtx.Unlock() + r.cargos[c.TrackingID] = c + return nil +} + +func (r *cargoRepository) Find(trackingID cargo.TrackingID) (*cargo.Cargo, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + if val, ok := r.cargos[trackingID]; ok { + return val, nil + } + return nil, cargo.ErrUnknown +} + +func (r *cargoRepository) FindAll() []*cargo.Cargo { + r.mtx.RLock() + defer r.mtx.RUnlock() + c := make([]*cargo.Cargo, 0, len(r.cargos)) + for _, val := range r.cargos { + c = append(c, val) + } + return c +} + +// NewCargo returns a new instance of a in-memory cargo repository. +func NewCargo() cargo.Repository { + return &cargoRepository{ + cargos: make(map[cargo.TrackingID]*cargo.Cargo), + } +} + +type locationRepository struct { + locations map[location.UNLocode]location.Location +} + +func (r *locationRepository) Find(locode location.UNLocode) (location.Location, error) { + if l, ok := r.locations[locode]; ok { + return l, nil + } + return location.Location{}, location.ErrUnknown +} + +func (r *locationRepository) FindAll() []location.Location { + l := make([]location.Location, 0, len(r.locations)) + for _, val := range r.locations { + l = append(l, val) + } + return l +} + +// NewLocation returns a new instance of a in-memory location repository. +func NewLocation() location.Repository { + r := &locationRepository{ + locations: make(map[location.UNLocode]location.Location), + } + + r.locations[location.SESTO] = location.Stockholm + r.locations[location.AUMEL] = location.Melbourne + r.locations[location.CNHKG] = location.Hongkong + r.locations[location.JNTKO] = location.Tokyo + r.locations[location.NLRTM] = location.Rotterdam + r.locations[location.DEHAM] = location.Hamburg + + return r +} + +type voyageRepository struct { + voyages map[voyage.Number]*voyage.Voyage +} + +func (r *voyageRepository) Find(voyageNumber voyage.Number) (*voyage.Voyage, error) { + if v, ok := r.voyages[voyageNumber]; ok { + return v, nil + } + + return nil, voyage.ErrUnknown +} + +// NewVoyage returns a new instance of a in-memory voyage repository. +func NewVoyage() voyage.Repository { + r := &voyageRepository{ + voyages: make(map[voyage.Number]*voyage.Voyage), + } + + r.voyages[voyage.V100.Number] = voyage.V100 + r.voyages[voyage.V300.Number] = voyage.V300 + r.voyages[voyage.V400.Number] = voyage.V400 + + r.voyages[voyage.V0100S.Number] = voyage.V0100S + r.voyages[voyage.V0200T.Number] = voyage.V0200T + r.voyages[voyage.V0300A.Number] = voyage.V0300A + r.voyages[voyage.V0301S.Number] = voyage.V0301S + r.voyages[voyage.V0400S.Number] = voyage.V0400S + + return r +} + +type handlingEventRepository struct { + mtx sync.RWMutex + events map[cargo.TrackingID][]cargo.HandlingEvent +} + +func (r *handlingEventRepository) Store(e cargo.HandlingEvent) { + r.mtx.Lock() + defer r.mtx.Unlock() + // Make array if it's the first event with this tracking ID. + if _, ok := r.events[e.TrackingID]; !ok { + r.events[e.TrackingID] = make([]cargo.HandlingEvent, 0) + } + r.events[e.TrackingID] = append(r.events[e.TrackingID], e) +} + +func (r *handlingEventRepository) QueryHandlingHistory(trackingID cargo.TrackingID) cargo.HandlingHistory { + r.mtx.RLock() + defer r.mtx.RUnlock() + return cargo.HandlingHistory{HandlingEvents: r.events[trackingID]} +} + +// NewHandlingEvent returns a new instance of a in-memory handling event repository. +func NewHandlingEvent() cargo.HandlingEventRepository { + return &handlingEventRepository{ + events: make(map[cargo.TrackingID][]cargo.HandlingEvent), + } +} diff --git a/examples/shipping/routing/proxying.go b/examples/shipping/routing/proxying.go new file mode 100644 index 0000000..f9e536a --- /dev/null +++ b/examples/shipping/routing/proxying.go @@ -0,0 +1,117 @@ +package routing + +import ( + "encoding/json" + "net/http" + "net/url" + "time" + + "golang.org/x/net/context" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/examples/shipping/cargo" + "github.com/go-kit/kit/examples/shipping/location" + "github.com/go-kit/kit/examples/shipping/voyage" + kithttp "github.com/go-kit/kit/transport/http" +) + +type proxyService struct { + context.Context + FetchRoutesEndpoint endpoint.Endpoint + Service +} + +func (s proxyService) FetchRoutesForSpecification(rs cargo.RouteSpecification) []cargo.Itinerary { + response, err := s.FetchRoutesEndpoint(s.Context, fetchRoutesRequest{ + From: string(rs.Origin), + To: string(rs.Destination), + }) + if err != nil { + return []cargo.Itinerary{} + } + + resp := response.(fetchRoutesResponse) + + var itineraries []cargo.Itinerary + for _, r := range resp.Paths { + var legs []cargo.Leg + for _, e := range r.Edges { + legs = append(legs, cargo.Leg{ + VoyageNumber: voyage.Number(e.Voyage), + LoadLocation: location.UNLocode(e.Origin), + UnloadLocation: location.UNLocode(e.Destination), + LoadTime: e.Departure, + UnloadTime: e.Arrival, + }) + } + + itineraries = append(itineraries, cargo.Itinerary{Legs: legs}) + } + + return itineraries +} + +// ServiceMiddleware defines a middleware for a routing service. +type ServiceMiddleware func(Service) Service + +// NewProxyingMiddleware returns a new instance of a proxying middleware. +func NewProxyingMiddleware(proxyURL string, ctx context.Context) ServiceMiddleware { + return func(next Service) Service { + var e endpoint.Endpoint + e = makeFetchRoutesEndpoint(ctx, proxyURL) + e = circuitbreaker.Hystrix("fetch-routes")(e) + return proxyService{ctx, e, next} + } +} + +type fetchRoutesRequest struct { + From string + To string +} + +type fetchRoutesResponse struct { + Paths []struct { + Edges []struct { + Origin string `json:"origin"` + Destination string `json:"destination"` + Voyage string `json:"voyage"` + Departure time.Time `json:"departure"` + Arrival time.Time `json:"arrival"` + } `json:"edges"` + } `json:"paths"` +} + +func makeFetchRoutesEndpoint(ctx context.Context, instance string) endpoint.Endpoint { + u, err := url.Parse(instance) + if err != nil { + panic(err) + } + if u.Path == "" { + u.Path = "/paths" + } + return kithttp.NewClient( + "GET", u, + encodeFetchRoutesRequest, + decodeFetchRoutesResponse, + ).Endpoint() +} + +func decodeFetchRoutesResponse(resp *http.Response) (interface{}, error) { + var response fetchRoutesResponse + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, err + } + return response, nil +} + +func encodeFetchRoutesRequest(r *http.Request, request interface{}) error { + req := request.(fetchRoutesRequest) + + vals := r.URL.Query() + vals.Add("from", req.From) + vals.Add("to", req.To) + r.URL.RawQuery = vals.Encode() + + return nil +} diff --git a/examples/shipping/routing/routing.go b/examples/shipping/routing/routing.go new file mode 100644 index 0000000..dac3690 --- /dev/null +++ b/examples/shipping/routing/routing.go @@ -0,0 +1,13 @@ +// Package routing provides the routing domain service. It does not actually +// implement the routing service but merely acts as a proxy for a separate +// bounded context. +package routing + +import "github.com/go-kit/kit/examples/shipping/cargo" + +// Service provides access to an external routing service. +type Service interface { + // FetchRoutesForSpecification finds all possible routes that satisfy a + // given specification. + FetchRoutesForSpecification(rs cargo.RouteSpecification) []cargo.Itinerary +} diff --git a/examples/shipping/tracking/endpoint.go b/examples/shipping/tracking/endpoint.go new file mode 100644 index 0000000..ea105d5 --- /dev/null +++ b/examples/shipping/tracking/endpoint.go @@ -0,0 +1,26 @@ +package tracking + +import ( + "golang.org/x/net/context" + + "github.com/go-kit/kit/endpoint" +) + +type trackCargoRequest struct { + ID string +} + +type trackCargoResponse struct { + Cargo *Cargo `json:"cargo,omitempty"` + Err error `json:"error,omitempty"` +} + +func (r trackCargoResponse) error() error { return r.Err } + +func makeTrackCargoEndpoint(ts Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(trackCargoRequest) + c, err := ts.Track(req.ID) + return trackCargoResponse{Cargo: &c, Err: err}, nil + } +} diff --git a/examples/shipping/tracking/logging.go b/examples/shipping/tracking/logging.go new file mode 100644 index 0000000..584aeaa --- /dev/null +++ b/examples/shipping/tracking/logging.go @@ -0,0 +1,24 @@ +package tracking + +import ( + "time" + + "github.com/go-kit/kit/log" +) + +type loggingService struct { + logger log.Logger + Service +} + +// NewLoggingService returns a new instance of a logging Service. +func NewLoggingService(logger log.Logger, s Service) Service { + return &loggingService{logger, s} +} + +func (s *loggingService) Track(id string) (c Cargo, err error) { + defer func(begin time.Time) { + s.logger.Log("method", "track", "tracking_id", id, "took", time.Since(begin), "err", err) + }(time.Now()) + return s.Service.Track(id) +} diff --git a/examples/shipping/tracking/service.go b/examples/shipping/tracking/service.go new file mode 100644 index 0000000..d5b9273 --- /dev/null +++ b/examples/shipping/tracking/service.go @@ -0,0 +1,163 @@ +// Package tracking provides the use-case of tracking a cargo. Used by views +// facing the end-user. +package tracking + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/go-kit/kit/examples/shipping/cargo" +) + +// ErrInvalidArgument is returned when one or more arguments are invalid. +var ErrInvalidArgument = errors.New("invalid argument") + +// Service is the interface that provides the basic Track method. +type Service interface { + // Track returns a cargo matching a tracking ID. + Track(id string) (Cargo, error) +} + +type service struct { + cargos cargo.Repository + handlingEvents cargo.HandlingEventRepository +} + +func (s *service) Track(id string) (Cargo, error) { + if id == "" { + return Cargo{}, ErrInvalidArgument + } + c, err := s.cargos.Find(cargo.TrackingID(id)) + if err != nil { + return Cargo{}, err + } + return assemble(c, s.handlingEvents), nil +} + +// NewService returns a new instance of the default Service. +func NewService(cargos cargo.Repository, handlingEvents cargo.HandlingEventRepository) Service { + return &service{ + cargos: cargos, + handlingEvents: handlingEvents, + } +} + +// Cargo is a read model for tracking views. +type Cargo struct { + TrackingID string `json:"tracking_id"` + StatusText string `json:"status_text"` + Origin string `json:"origin"` + Destination string `json:"destination"` + ETA time.Time `json:"eta"` + NextExpectedActivity string `json:"next_expected_activity"` + ArrivalDeadline time.Time `json:"arrival_deadline"` + Events []Event `json:"events"` +} + +// Leg is a read model for booking views. +type Leg struct { + VoyageNumber string `json:"voyage_number"` + From string `json:"from"` + To string `json:"to"` + LoadTime time.Time `json:"load_time"` + UnloadTime time.Time `json:"unload_time"` +} + +// Event is a read model for tracking views. +type Event struct { + Description string `json:"description"` + Expected bool `json:"expected"` +} + +func assemble(c *cargo.Cargo, her cargo.HandlingEventRepository) Cargo { + return Cargo{ + TrackingID: string(c.TrackingID), + Origin: string(c.Origin), + Destination: string(c.RouteSpecification.Destination), + ETA: c.Delivery.ETA, + NextExpectedActivity: nextExpectedActivity(c), + ArrivalDeadline: c.RouteSpecification.ArrivalDeadline, + StatusText: assembleStatusText(c), + Events: assembleEvents(c, her), + } +} + +func assembleLegs(c cargo.Cargo) []Leg { + var legs []Leg + for _, l := range c.Itinerary.Legs { + legs = append(legs, Leg{ + VoyageNumber: string(l.VoyageNumber), + From: string(l.LoadLocation), + To: string(l.UnloadLocation), + LoadTime: l.LoadTime, + UnloadTime: l.UnloadTime, + }) + } + return legs +} + +func nextExpectedActivity(c *cargo.Cargo) string { + a := c.Delivery.NextExpectedActivity + prefix := "Next expected activity is to" + + switch a.Type { + case cargo.Load: + return fmt.Sprintf("%s %s cargo onto voyage %s in %s.", prefix, strings.ToLower(a.Type.String()), a.VoyageNumber, a.Location) + case cargo.Unload: + return fmt.Sprintf("%s %s cargo off of voyage %s in %s.", prefix, strings.ToLower(a.Type.String()), a.VoyageNumber, a.Location) + case cargo.NotHandled: + return "There are currently no expected activities for this cargo." + } + + return fmt.Sprintf("%s %s cargo in %s.", prefix, strings.ToLower(a.Type.String()), a.Location) +} + +func assembleStatusText(c *cargo.Cargo) string { + switch c.Delivery.TransportStatus { + case cargo.NotReceived: + return "Not received" + case cargo.InPort: + return fmt.Sprintf("In port %s", c.Delivery.LastKnownLocation) + case cargo.OnboardCarrier: + return fmt.Sprintf("Onboard voyage %s", c.Delivery.CurrentVoyage) + case cargo.Claimed: + return "Claimed" + default: + return "Unknown" + } +} + +func assembleEvents(c *cargo.Cargo, r cargo.HandlingEventRepository) []Event { + h := r.QueryHandlingHistory(c.TrackingID) + + var events []Event + for _, e := range h.HandlingEvents { + var description string + + switch e.Activity.Type { + case cargo.NotHandled: + description = "Cargo has not yet been received." + case cargo.Receive: + description = fmt.Sprintf("Received in %s, at %s", e.Activity.Location, time.Now().Format(time.RFC3339)) + case cargo.Load: + description = fmt.Sprintf("Loaded onto voyage %s in %s, at %s.", e.Activity.VoyageNumber, e.Activity.Location, time.Now().Format(time.RFC3339)) + case cargo.Unload: + description = fmt.Sprintf("Unloaded off voyage %s in %s, at %s.", e.Activity.VoyageNumber, e.Activity.Location, time.Now().Format(time.RFC3339)) + case cargo.Claim: + description = fmt.Sprintf("Claimed in %s, at %s.", e.Activity.Location, time.Now().Format(time.RFC3339)) + case cargo.Customs: + description = fmt.Sprintf("Cleared customs in %s, at %s.", e.Activity.Location, time.Now().Format(time.RFC3339)) + default: + description = "[Unknown status]" + } + + events = append(events, Event{ + Description: description, + Expected: c.Itinerary.IsExpected(e), + }) + } + + return events +} diff --git a/examples/shipping/tracking/transport.go b/examples/shipping/tracking/transport.go new file mode 100644 index 0000000..428db89 --- /dev/null +++ b/examples/shipping/tracking/transport.go @@ -0,0 +1,74 @@ +package tracking + +import ( + "encoding/json" + "errors" + "net/http" + + "github.com/gorilla/mux" + "golang.org/x/net/context" + + "github.com/go-kit/kit/examples/shipping/cargo" + kitlog "github.com/go-kit/kit/log" + kithttp "github.com/go-kit/kit/transport/http" +) + +// MakeHandler returns a handler for the tracking service. +func MakeHandler(ctx context.Context, ts Service, logger kitlog.Logger) http.Handler { + r := mux.NewRouter() + + opts := []kithttp.ServerOption{ + kithttp.ServerErrorLogger(logger), + kithttp.ServerErrorEncoder(encodeError), + } + + trackCargoHandler := kithttp.NewServer( + ctx, + makeTrackCargoEndpoint(ts), + decodeTrackCargoRequest, + encodeResponse, + opts..., + ) + + r.Handle("/tracking/v1/cargos/{id}", trackCargoHandler).Methods("GET") + + return r +} + +func decodeTrackCargoRequest(r *http.Request) (interface{}, error) { + vars := mux.Vars(r) + id, ok := vars["id"] + if !ok { + return nil, errors.New("bad route") + } + return trackCargoRequest{ID: id}, nil +} + +func encodeResponse(w http.ResponseWriter, response interface{}) error { + if e, ok := response.(errorer); ok && e.error() != nil { + encodeError(w, e.error()) + return nil + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + return json.NewEncoder(w).Encode(response) +} + +type errorer interface { + error() error +} + +// encode errors from business-logic +func encodeError(w http.ResponseWriter, err error) { + switch err { + case cargo.ErrUnknown: + w.WriteHeader(http.StatusNotFound) + case ErrInvalidArgument: + w.WriteHeader(http.StatusBadRequest) + default: + w.WriteHeader(http.StatusInternalServerError) + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + json.NewEncoder(w).Encode(map[string]interface{}{ + "error": err.Error(), + }) +} diff --git a/examples/shipping/voyage/sample_voyages.go b/examples/shipping/voyage/sample_voyages.go new file mode 100644 index 0000000..51b7a05 --- /dev/null +++ b/examples/shipping/voyage/sample_voyages.go @@ -0,0 +1,40 @@ +package voyage + +import "github.com/go-kit/kit/examples/shipping/location" + +// A set of sample voyages. +var ( + V100 = New("V100", Schedule{ + []CarrierMovement{ + {DepartureLocation: location.Hongkong, ArrivalLocation: location.Tokyo}, + {DepartureLocation: location.Tokyo, ArrivalLocation: location.NewYork}, + }, + }) + + V300 = New("V300", Schedule{ + []CarrierMovement{ + {DepartureLocation: location.Tokyo, ArrivalLocation: location.Rotterdam}, + {DepartureLocation: location.Rotterdam, ArrivalLocation: location.Hamburg}, + {DepartureLocation: location.Hamburg, ArrivalLocation: location.Melbourne}, + {DepartureLocation: location.Melbourne, ArrivalLocation: location.Tokyo}, + }, + }) + + V400 = New("V400", Schedule{ + []CarrierMovement{ + {DepartureLocation: location.Hamburg, ArrivalLocation: location.Stockholm}, + {DepartureLocation: location.Stockholm, ArrivalLocation: location.Helsinki}, + {DepartureLocation: location.Helsinki, ArrivalLocation: location.Hamburg}, + }, + }) +) + +// These voyages are hard-coded into the current pathfinder. Make sure +// they exist. +var ( + V0100S = New("0100S", Schedule{[]CarrierMovement{}}) + V0200T = New("0200T", Schedule{[]CarrierMovement{}}) + V0300A = New("0300A", Schedule{[]CarrierMovement{}}) + V0301S = New("0301S", Schedule{[]CarrierMovement{}}) + V0400S = New("0400S", Schedule{[]CarrierMovement{}}) +) diff --git a/examples/shipping/voyage/voyage.go b/examples/shipping/voyage/voyage.go new file mode 100644 index 0000000..57a70b0 --- /dev/null +++ b/examples/shipping/voyage/voyage.go @@ -0,0 +1,44 @@ +// Package voyage provides the Voyage aggregate. +package voyage + +import ( + "errors" + "time" + + "github.com/go-kit/kit/examples/shipping/location" +) + +// Number uniquely identifies a particular Voyage. +type Number string + +// Voyage is a uniquely identifiable series of carrier movements. +type Voyage struct { + Number Number + Schedule Schedule +} + +// New creates a voyage with a voyage number and a provided schedule. +func New(n Number, s Schedule) *Voyage { + return &Voyage{Number: n, Schedule: s} +} + +// Schedule describes a voyage schedule. +type Schedule struct { + CarrierMovements []CarrierMovement +} + +// CarrierMovement is a vessel voyage from one location to another. +type CarrierMovement struct { + DepartureLocation location.Location + ArrivalLocation location.Location + DepartureTime time.Time + ArrivalTime time.Time +} + +// ErrUnknown is used when a voyage could not be found. +var ErrUnknown = errors.New("unknown voyage") + +// Repository provides access a voyage store. +type Repository interface { + Find(Number) (*Voyage, error) +} diff --git a/transport/grpc/client.go b/transport/grpc/client.go index acc3189..533c8e2 100644 --- a/transport/grpc/client.go +++ b/transport/grpc/client.go @@ -2,6 +2,7 @@ import ( "fmt" + "reflect" "golang.org/x/net/context" "google.golang.org/grpc" @@ -18,7 +19,7 @@ method string enc EncodeRequestFunc dec DecodeResponseFunc - grpcReply interface{} + grpcReply reflect.Type before []RequestFunc } @@ -33,12 +34,20 @@ options ...ClientOption, ) *Client { c := &Client{ - client: cc, - method: fmt.Sprintf("/pb.%s/%s", serviceName, method), - enc: enc, - dec: dec, - grpcReply: grpcReply, - before: []RequestFunc{}, + client: cc, + method: fmt.Sprintf("/pb.%s/%s", serviceName, method), + enc: enc, + dec: dec, + // We are using reflect.Indirect here to allow both reply structs and + // pointers to these reply structs. New consumers of the client should + // use structs directly, while existing consumers will not break if they + // remain to use pointers to structs. + grpcReply: reflect.TypeOf( + reflect.Indirect( + reflect.ValueOf(grpcReply), + ).Interface(), + ), + before: []RequestFunc{}, } for _, option := range options { option(c) @@ -73,11 +82,12 @@ } ctx = metadata.NewContext(ctx, *md) - if err = grpc.Invoke(ctx, c.method, req, c.grpcReply, c.client); err != nil { + grpcReply := reflect.New(c.grpcReply).Interface() + if err = grpc.Invoke(ctx, c.method, req, grpcReply, c.client); err != nil { return nil, fmt.Errorf("Invoke: %v", err) } - response, err := c.dec(ctx, c.grpcReply) + response, err := c.dec(ctx, grpcReply) if err != nil { return nil, fmt.Errorf("Decode: %v", err) } diff --git a/transport/httprp/README.md b/transport/httprp/README.md new file mode 100644 index 0000000..23853e2 --- /dev/null +++ b/transport/httprp/README.md @@ -0,0 +1,48 @@ +# package transport/httprp + +`package transport/httprp` provides an HTTP reverse-proxy transport. + +## Rationale + +HTTP server applications often associate multiple handlers with a single HTTP listener, each handler differentiated by the request URI and/or HTTP method. Handlers that perform business-logic in the app can implement the `Endpoint` interface and be exposed using the `package transport/http` server. Handlers that need to proxy the request to another HTTP endpoint can do so with this package by simply specifying the base URL to forward the request to. + +## Usage + +The following example uses the [Gorilla Mux](https://github.com/gorilla/mux) router to illustrate how a mixture of proxying and non-proxying request handlers can be used with a single listener: + +```go +import ( + "net/http" + "net/url" + + kithttp "github.com/go-kit/kit/transport/http" + kithttprp "github.com/go-kit/kit/transport/httprp" + "github.com/gorilla/mux" + "golang.org/x/net/context" +) + +func main() { + router := mux.NewRouter() + + // server HTTP endpoint handled here + router.Handle("/foo", + kithttp.NewServer( + context.Background(), + func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + func(*http.Request) (interface{}, error) { return struct{}{}, nil }, + func(http.ResponseWriter, interface{}) error { return nil }, + )).Methods("GET") + + // proxy endpoint, forwards requests to http://other.service.local/base/bar + remoteServiceURL, _ := url.Parse("http://other.service.local/base") + router.Handle("/bar", + kithttprp.NewServer( + context.Background(), + remoteServiceURL, + )).Methods("GET") + + http.ListenAndServe(":8080", router) +} +``` + +You can also supply a set of `RequestFunc` functions to be run before proxying the request. This can be useful for adding request headers required by the backend system (e.g. API tokens). diff --git a/transport/httprp/server.go b/transport/httprp/server.go new file mode 100644 index 0000000..d34e22b --- /dev/null +++ b/transport/httprp/server.go @@ -0,0 +1,62 @@ +package httprp + +import ( + "net/http" + "net/http/httputil" + "net/url" + + "golang.org/x/net/context" +) + +// RequestFunc may take information from an HTTP request and put it into a +// request context. BeforeFuncs are executed prior to invoking the +// endpoint. +type RequestFunc func(context.Context, *http.Request) context.Context + +// Server is a proxying request handler. +type Server struct { + ctx context.Context + proxy http.Handler + before []RequestFunc + errorEncoder func(w http.ResponseWriter, err error) +} + +// NewServer constructs a new server that implements http.Server and will proxy +// requests to the given base URL using its scheme, host, and base path. +// If the target's path is "/base" and the incoming request was for "/dir", +// the target request will be for /base/dir. +func NewServer( + ctx context.Context, + baseURL *url.URL, + options ...ServerOption, +) *Server { + s := &Server{ + ctx: ctx, + proxy: httputil.NewSingleHostReverseProxy(baseURL), + } + for _, option := range options { + option(s) + } + return s +} + +// ServerOption sets an optional parameter for servers. +type ServerOption func(*Server) + +// ServerBefore functions are executed on the HTTP request object before the +// request is decoded. +func ServerBefore(before ...RequestFunc) ServerOption { + return func(s *Server) { s.before = before } +} + +// ServeHTTP implements http.Handler. +func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithCancel(s.ctx) + defer cancel() + + for _, f := range s.before { + ctx = f(ctx, r) + } + + s.proxy.ServeHTTP(w, r) +} diff --git a/transport/httprp/server_test.go b/transport/httprp/server_test.go new file mode 100644 index 0000000..06946a7 --- /dev/null +++ b/transport/httprp/server_test.go @@ -0,0 +1,120 @@ +package httprp_test + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "golang.org/x/net/context" + + httptransport "github.com/go-kit/kit/transport/httprp" +) + +func TestServerHappyPathSingleServer(t *testing.T) { + originServer := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("hey")) + })) + defer originServer.Close() + originURL, _ := url.Parse(originServer.URL) + + handler := httptransport.NewServer( + context.Background(), + originURL, + ) + proxyServer := httptest.NewServer(handler) + defer proxyServer.Close() + + resp, _ := http.Get(proxyServer.URL) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } + + responseBody, _ := ioutil.ReadAll(resp.Body) + if want, have := "hey", string(responseBody); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestServerHappyPathSingleServerWithServerOptions(t *testing.T) { + const ( + headerKey = "X-TEST-HEADER" + headerVal = "go-kit-proxy" + ) + + originServer := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if want, have := headerVal, r.Header.Get(headerKey); want != have { + t.Errorf("want %d, have %d", want, have) + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("hey")) + })) + defer originServer.Close() + originURL, _ := url.Parse(originServer.URL) + + handler := httptransport.NewServer( + context.Background(), + originURL, + httptransport.ServerBefore(func(ctx context.Context, r *http.Request) context.Context { + r.Header.Add(headerKey, headerVal) + return ctx + }), + ) + proxyServer := httptest.NewServer(handler) + defer proxyServer.Close() + + resp, _ := http.Get(proxyServer.URL) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } + + responseBody, _ := ioutil.ReadAll(resp.Body) + if want, have := "hey", string(responseBody); want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestServerOriginServerNotFoundResponse(t *testing.T) { + originServer := httptest.NewServer(http.NotFoundHandler()) + defer originServer.Close() + originURL, _ := url.Parse(originServer.URL) + + handler := httptransport.NewServer( + context.Background(), + originURL, + ) + proxyServer := httptest.NewServer(handler) + defer proxyServer.Close() + + resp, _ := http.Get(proxyServer.URL) + if want, have := http.StatusNotFound, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestServerOriginServerUnreachable(t *testing.T) { + // create a server, then promptly shut it down + originServer := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + originURL, _ := url.Parse(originServer.URL) + originServer.Close() + + handler := httptransport.NewServer( + context.Background(), + originURL, + ) + proxyServer := httptest.NewServer(handler) + defer proxyServer.Close() + + resp, _ := http.Get(proxyServer.URL) + if want, have := http.StatusInternalServerError, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } +}