ObservableEventListener in a custom out of process host

Topics: Semantic Logging Application Block
Oct 21, 2013 at 3:17 PM
I have created a custom out of process event listener application using TraceEventService class.
Is it possible to use the ObservableEventListener in this custom (out of process) host?
And I would like to configure for example a flat file sink for the host programmatically. Can I do that without loading a xml-file?

Thanks!
Oct 21, 2013 at 6:48 PM
I believe that ObservableEventListener is only useful for an in-process scenario. The existing service uses an EventEntrySubject which implements both IObservable<EventEntry> and IObserver<EventEntry>.

ObservableEventListener only implements IObservable<EventEntry> where the TraceEventServiceConfiguration wants IObserver<EventEntry>. Unfortunately, EventEntrySubject is an internal sealed class so you can't use it easily (with reflection it would be possible). Another alternative would be to copy the the code in your application. It would look like this:
    /// <summary>
    /// A subject that can be observed and publish events.
    /// </summary>    
    /// <remarks>
    /// This is a very basic implementation of a subject to avoid references to Rx when the
    /// end user might not want to do advanced filtering and projection of event streams.
    /// </remarks>
    internal sealed class EventEntrySubject : IObservable<EventEntry>, IObserver<EventEntry>, IDisposable
    {
        private readonly object lockObject = new object();
        private volatile ReadOnlyCollection<IObserver<EventEntry>> observers = new List<IObserver<EventEntry>>().AsReadOnly();
        private volatile bool isFrozen = false;

        /// <summary>
        /// Releases all resources used by the current instance and unsubscribes all the observers.
        /// </summary>
        public void Dispose()
        {
            this.OnCompleted();
        }

        /// <summary>
        /// Notifies the provider that an observer is to receive notifications.
        /// </summary>
        /// <param name="observer">The object that is to receive notifications.</param>
        /// <returns>A reference to an interface that allows observers to stop receiving notifications
        /// before the provider has finished sending them.</returns>
        public IDisposable Subscribe(IObserver<EventEntry> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (this.lockObject)
            {
                if (!this.isFrozen)
                {
                    var copy = this.observers.ToList();
                    copy.Add(observer);
                    this.observers = copy.AsReadOnly();
                    return new Subscription(this, observer);
                }
            }

            observer.OnCompleted();
            return new EmptyDisposable();
        }

        private void Unsubscribe(IObserver<EventEntry> observer)
        {
            lock (this.lockObject)
            {
                this.observers = this.observers.Where(x => !observer.Equals(x)).ToList().AsReadOnly();
            }
        }

        /// <summary>
        /// Notifies the observer that the provider has finished sending push-based notifications.
        /// </summary>
        public void OnCompleted()
        {
            var currentObservers = this.TakeObserversAndFreeze();

            if (currentObservers != null)
            {
                Parallel.ForEach(currentObservers, observer => observer.OnCompleted());
            }
        }

        /// <summary>
        /// Notifies the observer that the provider has experienced an error condition.
        /// </summary>
        /// <param name="error">An object that provides additional information about the error.</param>
        public void OnError(Exception error)
        {
            var currentObservers = TakeObserversAndFreeze();

            if (currentObservers != null)
            {
                Parallel.ForEach(currentObservers, observer => observer.OnError(error));
            }
        }

        /// <summary>
        /// Provides the observers with new data.
        /// </summary>
        /// <param name="value">The current notification information.</param>
        public void OnNext(EventEntry value)
        {
            foreach (var observer in this.observers)
            {
                // TODO: should I isolate errors (i.e: try/catch around each OnNext call)?
                observer.OnNext(value);
            }
        }

        private ReadOnlyCollection<IObserver<EventEntry>> TakeObserversAndFreeze()
        {
            lock (this.lockObject)
            {
                if (!this.isFrozen)
                {
                    this.isFrozen = true;
                    var copy = this.observers;
                    this.observers = new List<IObserver<EventEntry>>().AsReadOnly();

                    return copy;
                }

                return null;
            }
        }

        private sealed class Subscription : IDisposable
        {
            private IObserver<EventEntry> observer;
            private EventEntrySubject subject;

            public Subscription(EventEntrySubject subject, IObserver<EventEntry> observer)
            {
                this.subject = subject;
                this.observer = observer;
            }

            public void Dispose()
            {
                var current = Interlocked.Exchange<IObserver<EventEntry>>(ref this.observer, null);
                if (current != null)
                {
                    this.subject.Unsubscribe(current);
                    this.subject = null;
                }
            }
        }

        private sealed class EmptyDisposable : IDisposable
        {
            public void Dispose()
            {
            }
        }
    }
With that in place you can programmatically configure the TraceEventService:
var sinkSettingsList = new List<SinkSettings>();

var subject = new EventEntrySubject();
subject.LogToFlatFile("file.log");

var sinkSettings = new SinkSettings("Flat File",
    subject,
    new List<EventSourceSettings>() { new EventSourceSettings("MyEventSource") });

sinkSettingsList.Add(sinkSettings);

var configuration = new TraceEventServiceConfiguration(sinkSettingsList);

using (var service = new TraceEventService(configuration))
{
    // ...
}

The above will work but perhaps there is a better way to do this than duplicating code so I'm open to other ideas.

~~
Randy Levy
entlib.support@live.com
Enterprise Library support engineer
Support How-to
Marked as answer by Behrus on 10/22/2013 at 12:26 AM
Oct 22, 2013 at 8:28 AM
Thank you for providing the solution. It works perfect in my scenario and is good for now.
But you say ObservableEventListener is only useful in an in-process scenario, why? In my opinion, what you want to log or see in the log file is not depending on which program is listening to it or writing the log. e.g. flushing the buffer on specific log level.
Oct 22, 2013 at 4:21 PM
Edited Oct 22, 2013 at 7:25 PM
I understand what you are saying -- it would be nice to have an abstraction to handle both situations.

ObservableEventListener is an implementation of EventListener. From MSDN:
An event listener represents the target for all events generated by event source (EventSource object) implementations in the current application domain. When a new event listener is created, it is logically attached to all event sources in that application domain.

So the EventListener implementation is an in-process listener which internally uses EventEntrySubject to notify observers (sinks). In the out-of-process scenario the TraceEventService will directly open an ETW event source for processing. I'm going to guess that one of the goals of the ObservableEventListener was to make it simple to use since developers would be using that class for in-process logging whereas the ETW Service would usually be something that would be used as is. The common logic being in the EventEntrySubject (which is, unfortunately, not exposed publicly).

Not sure if that helps you though. :)

I was thinking about it and there is another way to get the EventEntrySubject from the ObservableEventListener that uses reflection. It's less code but relies on Enterprise Library internals. So on the one hand if the internals change the code could break; however,in the first example any bug fixes/features in subsequent releases wouldn't be automatically used (since the code is copied).
var listener = new ObservableEventListener();
listener.LogToFlatFile("file2.log");

IObserver<EventEntry> subject = listener.GetType()
    .GetField("subject", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)
    .GetValue(listener) as IObserver<EventEntry>;

var sinkSettingsList = new List<SinkSettings>();

var sinkSettings = new SinkSettings("Flat File",
    subject,
    new List<EventSourceSettings>() { new EventSourceSettings("MyEventSource") });

sinkSettingsList.Add(sinkSettings);

var configuration = new TraceEventServiceConfiguration(sinkSettingsList);

using (var service = new TraceEventService(configuration))
{
//...
}
~~
Randy Levy
entlib.support@live.com
Enterprise Library support engineer
Support How-to
Oct 22, 2013 at 7:34 PM
I was thinking that an extension method could hide some of the reflection ugliness (and would help with DRY):
public static class ObservableEventListenerExtension
{
    public static IObserver<EventEntry> GetEventEntryObserver(this ObservableEventListener listener)
    {
        return listener.GetType().GetField("subject",
            System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)
            .GetValue(listener) as IObserver<EventEntry>;
    }
}

var listener = new ObservableEventListener();
listener.LogToFlatFile("file2.log");

var sinkSettingsList = new List<SinkSettings>();

var sinkSettings = new SinkSettings("Flat File",
    listener.GetEventEntryObserver(),
    new List<EventSourceSettings>() { new EventSourceSettings("MyEventSource") });

sinkSettingsList.Add(sinkSettings);

var configuration = new TraceEventServiceConfiguration(sinkSettingsList);

using (var service = new TraceEventService(configuration))
{
    // ...
}

It doesn't really mitigate the major issue with the approach (accessing private member variables) but does make it look a little nicer.

~~
Randy Levy
entlib.support@live.com
Enterprise Library support engineer
Support How-to
Marked as answer by Behrus on 10/23/2013 at 4:08 AM
Oct 23, 2013 at 12:08 PM
Very nice solution, thanks again for that and for the explanation. It feels like a hack but it does the job!
For us are the out-of-process logging and observability of entries much more interesting than structured nature of logging itself. So it would be nice if both scenarios would offer the same feature set and would be more extendable in a feature release. It would be also great if ETW or at least those classes were available over nuget.