Skip to content

Commit 290d2ab

Browse files
committed
Fixes a data-race in audit/logger observers
Fixes: https://bugs.launchpad.net/juju-core/+bug/1598063 Previously, Observers were called into at both the API server level and the RPC request level. This was causing issues as state is intended to be stored within the observer instead of being passed in. The crux of the problem was that the Observer was doing too much. Observers now instead know how to create rpc.Observers which are then created for each RPC request. This isolates state, and the types which observe events are now split along RPC and API server delineations.
1 parent 98ddf55 commit 290d2ab

File tree

11 files changed

+322
-177
lines changed

11 files changed

+322
-177
lines changed

apiserver/observer/audit.go

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,6 @@ func (a *Audit) Login(tag string) {
6262
a.state.authenticatedTag = tag
6363
}
6464

65-
// ServerRequest implements Observer.
66-
func (a *Audit) ServerRequest(hdr *rpc.Header, body interface{}) {
67-
auditEntry := a.boilerplateAuditEntry()
68-
//auditEntry.OriginIP =
69-
auditEntry.OriginType = "API request"
70-
auditEntry.OriginName = a.state.authenticatedTag
71-
auditEntry.Operation = rpcRequestToOperation(hdr.Request)
72-
auditEntry.Data = map[string]interface{}{"request-body": body}
73-
err := a.handleAuditEntry(auditEntry)
74-
if err != nil {
75-
a.errorHandler(errors.Trace(err))
76-
}
77-
}
78-
7965
// Join implements Observer.
8066
func (a *Audit) Join(req *http.Request) {
8167
a.state.remoteAddress = req.RemoteAddr
@@ -87,22 +73,53 @@ func (a *Audit) Leave() {
8773
a.state.authenticatedTag = ""
8874
}
8975

90-
// ClientRequest implements Observer.
91-
func (a *Audit) ClientRequest(hdr *rpc.Header, body interface{}) {}
76+
// RPCObserver implements Observer.
77+
func (a *Audit) RPCObserver() rpc.Observer {
78+
return &AuditRPCObserver{
79+
jujuServerVersion: a.jujuServerVersion,
80+
modelUUID: a.modelUUID,
81+
errorHandler: a.errorHandler,
82+
handleAuditEntry: a.handleAuditEntry,
83+
authenticatedTag: a.state.authenticatedTag,
84+
remoteAddress: a.state.remoteAddress,
85+
}
86+
}
9287

93-
// ServerReply implements Observer.
94-
func (a *Audit) ServerReply(rpc.Request, *rpc.Header, interface{}) {}
88+
// AuditRPCObserver is an observer which will log RPC requests using
89+
// the function provided.
90+
type AuditRPCObserver struct {
91+
jujuServerVersion version.Number
92+
modelUUID string
93+
errorHandler ErrorHandler
94+
handleAuditEntry audit.AuditEntrySinkFn
95+
authenticatedTag string
96+
remoteAddress string
97+
}
98+
99+
// ServerRequest implements Observer.
100+
func (a *AuditRPCObserver) ServerRequest(hdr *rpc.Header, body interface{}) {
101+
auditEntry := a.boilerplateAuditEntry()
102+
auditEntry.OriginName = a.authenticatedTag
103+
104+
auditEntry.OriginType = "API request"
105+
auditEntry.Operation = rpcRequestToOperation(hdr.Request)
106+
auditEntry.Data = map[string]interface{}{"request-body": body}
107+
err := a.handleAuditEntry(auditEntry)
108+
if err != nil {
109+
a.errorHandler(errors.Trace(err))
110+
}
111+
}
95112

96-
// ClientReply implements Observer.
97-
func (a *Audit) ClientReply(req rpc.Request, hdr *rpc.Header, body interface{}) {}
113+
// ServerReply implements Observer.
114+
func (a *AuditRPCObserver) ServerReply(rpc.Request, *rpc.Header, interface{}) {}
98115

99-
func (a *Audit) boilerplateAuditEntry() audit.AuditEntry {
116+
func (a *AuditRPCObserver) boilerplateAuditEntry() audit.AuditEntry {
100117
return audit.AuditEntry{
101118
JujuServerVersion: a.jujuServerVersion,
102119
ModelUUID: a.modelUUID,
103120
Timestamp: time.Now().UTC(),
104-
RemoteAddress: a.state.remoteAddress,
105-
OriginName: a.state.authenticatedTag,
121+
RemoteAddress: a.remoteAddress,
122+
OriginName: a.authenticatedTag,
106123
}
107124
}
108125

apiserver/observer/fakeobserver/instance.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,43 +13,47 @@ import (
1313
"github.com/juju/testing"
1414
)
1515

16+
// Instance is a fake Observer used for testing.
1617
type Instance struct {
1718
testing.Stub
1819
}
1920

21+
// Join implements Observer.
2022
func (f *Instance) Join(req *http.Request) {
2123
f.AddCall(funcName(), req)
2224
}
2325

26+
// Leave implements Observer.
2427
func (f *Instance) Leave() {
2528
f.AddCall(funcName())
2629
}
2730

28-
// ServerReply implements Observer.
29-
func (f *Instance) ServerReply(req rpc.Request, hdr *rpc.Header, body interface{}) {
30-
f.AddCall(funcName(), req, hdr, body)
31-
}
32-
33-
// ServerRequest implements Observer.
34-
func (f *Instance) ServerRequest(hdr *rpc.Header, body interface{}) {
35-
f.AddCall(funcName(), hdr, body)
36-
}
37-
3831
// Login implements Observer.
3932
func (f *Instance) Login(entityName string) {
4033
f.AddCall(funcName(), entityName)
4134
}
4235

43-
// ClientRequest implements Observer.
44-
func (f *Instance) ClientRequest(hdr *rpc.Header, body interface{}) {
45-
f.AddCall(funcName(), hdr, body)
36+
// RPCObserver implements Observer.
37+
func (f *Instance) RPCObserver() rpc.Observer {
38+
f.AddCall(funcName())
39+
return &RPCInstance{}
4640
}
4741

48-
// ClientReply implements Observer.
49-
func (f *Instance) ClientReply(req rpc.Request, hdr *rpc.Header, body interface{}) {
42+
// RPCInstance is a fake RPCObserver used for testing.
43+
type RPCInstance struct {
44+
testing.Stub
45+
}
46+
47+
// ServerReply implements Observer.
48+
func (f *RPCInstance) ServerReply(req rpc.Request, hdr *rpc.Header, body interface{}) {
5049
f.AddCall(funcName(), req, hdr, body)
5150
}
5251

52+
// ServerRequest implements Observer.
53+
func (f *RPCInstance) ServerRequest(hdr *rpc.Header, body interface{}) {
54+
f.AddCall(funcName(), hdr, body)
55+
}
56+
5357
// funcName returns the name of the function/method that called
5458
// funcName() It panics if this is not possible.
5559
func funcName() string {

apiserver/observer/observer.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// Observer defines a type which will observe API server events as
1414
// they happen.
1515
type Observer interface {
16-
rpc.RequestNotifier
16+
rpc.ObserverFactory
1717

1818
// Login informs an Observer that an entity has logged in.
1919
Login(string)
@@ -84,21 +84,22 @@ func (m *Multiplexer) Leave() {
8484
mapConcurrent(Observer.Leave, m.observers)
8585
}
8686

87-
// ServerReply implements Observer.
88-
func (m *Multiplexer) ServerReply(req rpc.Request, hdr *rpc.Header, body interface{}) {
89-
mapConcurrent(func(o Observer) { o.ServerReply(req, hdr, body) }, m.observers)
90-
}
91-
92-
// ServerRequest implements Observer.
93-
func (m *Multiplexer) ServerRequest(hdr *rpc.Header, body interface{}) {
94-
mapConcurrent(func(o Observer) { o.ServerRequest(hdr, body) }, m.observers)
95-
}
96-
9787
// Login implements Observer.
9888
func (m *Multiplexer) Login(entityName string) {
9989
mapConcurrent(func(o Observer) { o.Login(entityName) }, m.observers)
10090
}
10191

92+
// RPCObserver implements Observer. It will create an
93+
// rpc.ObserverMultiplexer by calling all the Observer's RPCObserver
94+
// methods.
95+
func (m *Multiplexer) RPCObserver() rpc.Observer {
96+
rpcObservers := make([]rpc.Observer, len(m.observers))
97+
for i, o := range m.observers {
98+
rpcObservers[i] = o.RPCObserver()
99+
}
100+
return rpc.NewObserverMultiplexer(rpcObservers...)
101+
}
102+
102103
// mapConcurrent calls fn on all observers concurrently and then waits
103104
// for all calls to exit before returning.
104105
func mapConcurrent(fn func(Observer), observers []Observer) {

apiserver/observer/observer_test.go

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@ import (
1111

1212
"github.com/juju/juju/apiserver/observer"
1313
"github.com/juju/juju/apiserver/observer/fakeobserver"
14-
"github.com/juju/juju/rpc"
1514
)
1615

17-
type observerSuite struct {
16+
type multiplexerSuite struct {
1817
testing.IsolationSuite
1918
}
2019

21-
var _ = gc.Suite(&observerSuite{})
20+
var _ = gc.Suite(&multiplexerSuite{})
2221

23-
func (*observerSuite) TestObserverFactoryMultiplexer_CallsAllFactories(c *gc.C) {
22+
func (*multiplexerSuite) TestObserverFactoryMultiplexer_CallsAllFactories(c *gc.C) {
2423
callCount := 0
2524
factories := []observer.ObserverFactory{
2625
func() observer.Observer { callCount++; return nil },
@@ -35,7 +34,7 @@ func (*observerSuite) TestObserverFactoryMultiplexer_CallsAllFactories(c *gc.C)
3534
c.Check(callCount, gc.Equals, 2)
3635
}
3736

38-
func (*observerSuite) TestJoin_CallsAllObservers(c *gc.C) {
37+
func (*multiplexerSuite) TestJoin_CallsAllObservers(c *gc.C) {
3938
observers := []*fakeobserver.Instance{
4039
&fakeobserver.Instance{},
4140
&fakeobserver.Instance{},
@@ -50,7 +49,7 @@ func (*observerSuite) TestJoin_CallsAllObservers(c *gc.C) {
5049
}
5150
}
5251

53-
func (*observerSuite) TestLeave_CallsAllObservers(c *gc.C) {
52+
func (*multiplexerSuite) TestLeave_CallsAllObservers(c *gc.C) {
5453
observers := []*fakeobserver.Instance{
5554
&fakeobserver.Instance{},
5655
&fakeobserver.Instance{},
@@ -64,44 +63,21 @@ func (*observerSuite) TestLeave_CallsAllObservers(c *gc.C) {
6463
}
6564
}
6665

67-
func (*observerSuite) TestServerReply_CallsAllObservers(c *gc.C) {
66+
func (*multiplexerSuite) TestRPCObserver_CallsAllObservers(c *gc.C) {
6867
observers := []*fakeobserver.Instance{
6968
&fakeobserver.Instance{},
7069
&fakeobserver.Instance{},
7170
}
7271

7372
o := observer.NewMultiplexer(observers[0], observers[1])
74-
var (
75-
req rpc.Request
76-
hdr rpc.Header
77-
body string
78-
)
79-
o.ServerReply(req, &hdr, body)
73+
o.RPCObserver()
8074

8175
for _, f := range observers {
82-
f.CheckCall(c, 0, "ServerReply", req, &hdr, body)
76+
f.CheckCall(c, 0, "RPCObserver")
8377
}
8478
}
8579

86-
func (*observerSuite) TestServerRequest_CallsAllObservers(c *gc.C) {
87-
observers := []*fakeobserver.Instance{
88-
&fakeobserver.Instance{},
89-
&fakeobserver.Instance{},
90-
}
91-
92-
o := observer.NewMultiplexer(observers[0], observers[1])
93-
var (
94-
hdr rpc.Header
95-
body string
96-
)
97-
o.ServerRequest(&hdr, body)
98-
99-
for _, f := range observers {
100-
f.CheckCall(c, 0, "ServerRequest", &hdr, body)
101-
}
102-
}
103-
104-
func (*observerSuite) TestLogin_CallsAllObservers(c *gc.C) {
80+
func (*multiplexerSuite) TestLogin_CallsAllObservers(c *gc.C) {
10581
observers := []*fakeobserver.Instance{
10682
&fakeobserver.Instance{},
10783
&fakeobserver.Instance{},

0 commit comments

Comments
 (0)