001/* ***** BEGIN LICENSE BLOCK *****
002 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
003 *
004 * The contents of this file are subject to the Mozilla Public License Version
005 * 1.1 (the "License"); you may not use this file except in compliance with
006 * the License. You may obtain a copy of the License at
007 * http://www.mozilla.org/MPL/
008 *
009 * Software distributed under the License is distributed on an "AS IS" basis,
010 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
011 * for the specific language governing rights and limitations under the
012 * License.
013 *
014 * The Original Code is part of dcm4che, an implementation of DICOM(TM) in
015 * Java(TM), hosted at https://github.com/gunterze/dcm4che3.
016 *
017 * The Initial Developer of the Original Code is
018 * Agfa Healthcare.
019 * Portions created by the Initial Developer are Copyright (C) 2012
020 * the Initial Developer. All Rights Reserved.
021 *
022 * Contributor(s):
023 * See @authors listed below
024 *
025 * Alternatively, the contents of this file may be used under the terms of
026 * either the GNU General Public License Version 2 or later (the "GPL"), or
027 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
028 * in which case the provisions of the GPL or the LGPL are applicable instead
029 * of those above. If you wish to allow use of your version of this file only
030 * under the terms of either the GPL or the LGPL, and not to allow others to
031 * use your version of this file under the terms of the MPL, indicate your
032 * decision by deleting the provisions above and replace them with the notice
033 * and other provisions required by the GPL or the LGPL. If you do not delete
034 * the provisions above, a recipient may use your version of this file under
035 * the terms of any one of the MPL, the GPL or the LGPL.
036 *
037 * ***** END LICENSE BLOCK ***** */
038
039package org.dcm4che3.net.audit;
040
041import com.lmax.disruptor.*;
042import com.lmax.disruptor.dsl.Disruptor;
043import com.lmax.disruptor.dsl.ProducerType;
044import org.dcm4che3.audit.*;
045import org.dcm4che3.audit.AuditMessages.RoleIDCode;
046import org.dcm4che3.audit.AuditMessage;
047import org.dcm4che3.conf.core.api.ConfigurableClass;
048import org.dcm4che3.conf.core.api.ConfigurableProperty;
049import org.dcm4che3.conf.core.api.LDAP;
050import org.dcm4che3.net.Connection;
051import org.dcm4che3.net.Device;
052import org.dcm4che3.net.DeviceExtension;
053import org.dcm4che3.net.IncompatibleConnectionException;
054import org.dcm4che3.util.SafeClose;
055import org.dcm4che3.util.StreamUtils;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import javax.net.ssl.SSLContext;
060
061import java.io.*;
062import java.lang.management.ManagementFactory;
063import java.net.*;
064import java.nio.charset.Charset;
065import java.security.GeneralSecurityException;
066import java.util.*;
067import java.util.concurrent.Executor;
068import java.util.concurrent.Executors;
069import java.util.concurrent.ScheduledFuture;
070import java.util.concurrent.TimeUnit;
071
072/**
073 * @author Gunter Zeilinger <gunterze@gmail.com>
074 * @author Michael Backhaus <michael.backhaus@agfa.com>
075 */
076@LDAP(objectClasses = "dcmAuditLogger")
077@ConfigurableClass
078public class AuditLogger extends DeviceExtension {
079
080    private static final String DICOM_PRIMARY_DEVICE_TYPE = "dicomPrimaryDeviceType";
081
082    private static final String DEVICE_NAME_IN_FILENAME_SEPARATOR = "-._";
083
084    private static Disruptor<AuditMessageEvent> disruptor;
085
086    public enum SendStatus {
087        SENT, QUEUED, SUPPRESSED
088    }
089
090    private static final long serialVersionUID = 1595714214186063103L;
091
092    private static final int MSG_PROMPT_LEN = 8192;
093
094    private static Logger LOG = LoggerFactory.getLogger(AuditLogger.class);
095
096    public enum Facility {
097        kern,            // (0) -- kernel messages
098        user,            // (1) -- user-level messages
099        mail,            // (2) -- mail system messages
100        daemon,          // (3) -- system daemons' messages
101        auth,            // (4) -- authorization messages
102        syslog,          // (5) -- messages generated internally by syslogd
103        lpr,             // (6) -- line printer subsystem messages
104        news,            // (7) -- network news subsystem messages
105        uucp,            // (8) -- UUCP subsystem messages
106        cron,            // (9) -- clock daemon messages
107        authpriv,        // (10)-- security/authorization messages
108        ftp,             // (11)-- ftp daemon messages
109        ntp,             // (12)-- NTP subsystem messages
110        audit,           // (13)-- audit messages
111        console,         // (14)-- console messages
112        cron2,           // (15)-- clock daemon messages
113        local0,          // (16)
114        local1,          // (17)
115        local2,          // (18)
116        local3,          // (19)
117        local4,          // (20)
118        local5,          // (21)
119        local6,          // (22)
120        local7,          // (23)
121    }
122
123    public enum Severity {
124        emerg,           // (0)  -- emergency; system is unusable
125        alert,           // (1)  -- action must be taken immediately
126        crit,            // (2)  -- critical condition
127        err,             // (3)  -- error condition
128        warning,         // (4)  -- warning condition
129        notice,          // (5)  -- normal but significant condition
130        info,            // (6)  -- informational message
131        debug            // (7)  -- debug-level messages
132    }
133
134    public static final String MESSAGE_ID = "IHE+RFC-3881";
135
136    private static final int[] DIGITS_0X = {
137            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
138            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
139            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
140            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
141            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
142            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
143            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
144            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
145            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
146            '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
147    };
148    private static final int[] DIGITS_X0 = {
149            '0', '0', '0', '0', '0', '0', '0', '0', '0', '0',
150            '1', '1', '1', '1', '1', '1', '1', '1', '1', '1',
151            '2', '2', '2', '2', '2', '2', '2', '2', '2', '2',
152            '3', '3', '3', '3', '3', '3', '3', '3', '3', '3',
153            '4', '4', '4', '4', '4', '4', '4', '4', '4', '4',
154            '5', '5', '5', '5', '5', '5', '5', '5', '5', '5',
155            '6', '6', '6', '6', '6', '6', '6', '6', '6', '6',
156            '7', '7', '7', '7', '7', '7', '7', '7', '7', '7',
157            '8', '8', '8', '8', '8', '8', '8', '8', '8', '8',
158            '9', '9', '9', '9', '9', '9', '9', '9', '9', '9',
159    };
160    private static final byte[] BOM = {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
161    private static final char SYSLOG_VERSION = '1';
162    private static final InetAddress localHost = localHost();
163    private static final String processID = processID();
164    private static final Comparator<File> FILE_COMPARATOR = new Comparator<File>() {
165        @Override
166        public int compare(File o1, File o2) {
167            long diff = o1.lastModified() - o2.lastModified();
168            return diff < 0 ? -1 : diff > 0 ? 1 : 0;
169        }
170    };
171    private static volatile AuditLogger defaultLogger;
172
173    @ConfigurableProperty(name = "dcmAuditRecordRepositoryDeviceReference",
174            label = "ARR Device",
175            description = "Devices that correspond to Audit Record Repositories to which audit messages are sent",
176            tags = ConfigurableProperty.Tag.PRIMARY,
177            collectionOfReferences = true)
178    private List<Device> auditRecordRepositoryDevices = new ArrayList<Device>();
179
180    @ConfigurableProperty(
181            name = "dcmAuditFacility",
182            enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL,
183            defaultValue = "10")
184    private Facility facility = Facility.authpriv;
185
186    @ConfigurableProperty(
187            name = "dcmAuditSuccessSeverity",
188            enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL,
189            defaultValue = "5"
190    )
191    private Severity successSeverity = Severity.notice;
192
193    @ConfigurableProperty(
194            name = "dcmAuditMinorFailureSeverity",
195            enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL,
196            defaultValue = "4"
197    )
198    private Severity minorFailureSeverity = Severity.warning;
199
200    @ConfigurableProperty(
201            name = "dcmAuditSeriousFailureSeverity",
202            enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL,
203            defaultValue = "3"
204    )
205    private Severity seriousFailureSeverity = Severity.err;
206
207    @ConfigurableProperty(
208            name = "dcmAuditMajorFailureSeverity",
209            enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL,
210            defaultValue = "2"
211    )
212    private Severity majorFailureSeverity = Severity.crit;
213
214
215    @ConfigurableProperty(name = "dcmAuditApplicationName")
216    private String applicationName;
217
218    @ConfigurableProperty(name = "dcmAuditSourceID")
219    private String auditSourceID;
220
221    @ConfigurableProperty(name = "dcmAuditEnterpriseSiteID")
222    private String auditEnterpriseSiteID;
223
224    @ConfigurableProperty(name = "dcmAuditSourceTypeCode")
225    private String[] auditSourceTypeCodes = {};
226
227    @ConfigurableProperty(name = "dcmAuditMessageID", defaultValue = MESSAGE_ID)
228    private String messageID = MESSAGE_ID;
229
230    @ConfigurableProperty(name = "dcmAuditMessageEncoding", defaultValue = "UTF-8")
231    private String encoding = "UTF-8";
232
233    @ConfigurableProperty(name = "dcmAuditMessageSchemaURI", defaultValue =AuditMessages.SCHEMA_URI )
234    private String schemaURI = AuditMessages.SCHEMA_URI;
235
236    @ConfigurableProperty(name = "dcmAuditTimestampInUTC", defaultValue = "false")
237    private boolean timestampInUTC = false;
238
239    @ConfigurableProperty(name = "dcmAuditMessageBOM", defaultValue = "true")
240    private boolean includeBOM = true;
241
242    @ConfigurableProperty(name = "dcmAuditMessageFormatXML", defaultValue = "false")
243    private boolean formatXML;
244
245    @ConfigurableProperty(name = "dcmAuditMessageSupplement95Schema", defaultValue = "false")
246    private boolean supplement95;
247
248    @ConfigurableProperty(name = "dicomInstalled")
249    private Boolean auditLoggerInstalled;
250
251    @ConfigurableProperty(name = "dcmAuditIncludeInstanceUID")
252    private Boolean doIncludeInstanceUID = false;
253
254    @ConfigurableProperty(name = "dcmAuditLoggerSpoolDirectoryURI")
255    private String spoolDirectoryURI;
256
257    private File spoolDirectory;
258
259    private String spoolFileNamePrefix = "audit";
260    private String spoolFileNameSuffix = ".log";
261
262    @ConfigurableProperty(name = "dcmAuditLoggerRetryInterval", defaultValue = "0")
263    private int retryInterval;
264
265    @LDAP(
266            noContainerNode = true,
267            distinguishingField = "cn"
268    )
269    @ConfigurableProperty(name = "dcmAuditSuppressCriteria")
270    private final List<AuditSuppressCriteria> suppressAuditMessageFilters =
271            new ArrayList<AuditSuppressCriteria>(0);
272
273    @ConfigurableProperty(name = "dicomNetworkConnectionReference",
274            label = "Connections",
275            description = "Connections that can be used to send audit messages",
276            tags = ConfigurableProperty.Tag.PRIMARY,
277            collectionOfReferences = true)
278    private List<Connection> connections = new ArrayList<Connection>(1);
279
280    private transient MessageBuilder builder;
281    private transient Map<String,ActiveConnection> activeConnection = new HashMap<String, ActiveConnection>();
282    private transient ScheduledFuture<?> retryTimer;
283    private transient Exception lastException;
284    private transient long lastSentTimeInMillis;
285    private transient final FilenameFilter FILENAME_FILTER = new FilenameFilter() {
286        @Override
287        public boolean accept(File dir, String name) {
288            return name.startsWith(spoolFileNamePrefix) && name.endsWith(spoolFileNameSuffix);
289        }
290    };
291
292
293    public List<AuditSuppressCriteria> getSuppressAuditMessageFilters() {
294        return suppressAuditMessageFilters;
295    }
296
297    public void setSuppressAuditMessageFilters(List<AuditSuppressCriteria> suppressAuditMessageFilters) {
298        this.suppressAuditMessageFilters.clear();
299        for (AuditSuppressCriteria filter : suppressAuditMessageFilters) this.suppressAuditMessageFilters.add(filter);
300    }
301
302    public final List<Device> getAuditRecordRepositoryDevices() {
303        return auditRecordRepositoryDevices;
304    }
305
306    public List<String> getAuditRecordRepositoryDeviceNames() {
307        if (auditRecordRepositoryDevices == null)
308            throw new IllegalStateException("AuditRecordRepositoryDevice not initalized");
309        List<String> names = new ArrayList<String>(auditRecordRepositoryDevices.size());
310        for (Device d : auditRecordRepositoryDevices) {
311                names.add(d.getDeviceName());
312        }
313        return names;
314    }
315
316    public void setAuditRecordRepositoryDevices(List<Device> arrDevices) {
317        for (ActiveConnection c : activeConnection.values())
318                SafeClose.close(c);
319        activeConnection.clear();
320        if(arrDevices==null){
321            this.auditRecordRepositoryDevices.clear();
322        }else{
323            this.auditRecordRepositoryDevices = arrDevices;
324        }
325    }
326    
327    public void addAuditRecordRepositoryDevice(Device device) {
328        auditRecordRepositoryDevices.add(device);
329    }
330
331    public boolean removeAuditRecordRepositoryDevice(Device device) {
332        return auditRecordRepositoryDevices.remove(device);
333    }
334
335    public final Facility getFacility() {
336        return facility;
337    }
338
339    public final void setFacility(Facility facility) {
340        if (facility == null)
341            throw new NullPointerException();
342        this.facility = facility;
343    }
344
345    public final Severity getSuccessSeverity() {
346        return successSeverity;
347    }
348
349    public final void setSuccessSeverity(Severity severity) {
350        if (severity == null)
351            throw new NullPointerException();
352        this.successSeverity = severity;
353    }
354
355    public final Severity getMinorFailureSeverity() {
356        return minorFailureSeverity;
357    }
358
359    public final void setMinorFailureSeverity(Severity severity) {
360        if (severity == null)
361            throw new NullPointerException();
362        this.minorFailureSeverity = severity;
363    }
364
365    public final Severity getSeriousFailureSeverity() {
366        return seriousFailureSeverity;
367    }
368
369    public final void setSeriousFailureSeverity(Severity severity) {
370        if (severity == null)
371            throw new NullPointerException();
372        this.seriousFailureSeverity = severity;
373    }
374
375    public final Severity getMajorFailureSeverity() {
376        return majorFailureSeverity;
377    }
378
379    public final void setMajorFailureSeverity(Severity severity) {
380        if (severity == null)
381            throw new NullPointerException();
382        this.majorFailureSeverity = severity;
383    }
384
385    public final String getApplicationName() {
386        return applicationName;
387    }
388
389    private String applicationName() {
390        return applicationName != null
391                ? applicationName
392                : auditSourceID();
393    }
394
395    public final void setApplicationName(String applicationName) {
396        this.applicationName = applicationName;
397    }
398
399    public final String getAuditSourceID() {
400        return auditSourceID;
401    }
402
403    public final void setAuditSourceID(String auditSourceID) {
404        this.auditSourceID = auditSourceID;
405    }
406
407    private String auditSourceID() {
408        return auditSourceID != null
409                ? auditSourceID
410                : getDevice().getDeviceName();
411    }
412
413    public final String getAuditEnterpriseSiteID() {
414        return auditEnterpriseSiteID;
415    }
416
417    public final void setAuditEnterpriseSiteID(String auditEnterpriseSiteID) {
418        this.auditEnterpriseSiteID = auditEnterpriseSiteID;
419    }
420
421    public String[] getAuditSourceTypeCodes() {
422        return auditSourceTypeCodes;
423    }
424
425    public void setAuditSourceTypeCodes(String... auditSourceTypeCode) {
426        this.auditSourceTypeCodes = auditSourceTypeCode;
427    }
428
429    public ActiveParticipant createActiveParticipant(
430            boolean requestor, RoleIDCode... roleIDs) {
431
432        Collection<String> aets = device.getApplicationAETitles();
433
434        return createActiveParticipant(requestor,
435                processID(),
436                AuditMessages.alternativeUserIDForAETitle(
437                        aets.toArray(new String[aets.size()])),
438                applicationName(),
439                localHost().getHostName(),
440                roleIDs);
441    }
442
443    public ActiveParticipant createActiveParticipant(
444            boolean requestor,
445            String userID,
446            String alternativeUserID,
447            String userName,
448            String hostName,
449            RoleIDCode... roleIDs) {
450        ActiveParticipant ap = new ActiveParticipant();
451        ap.setUserID(userID);
452        ap.setAlternativeUserID(alternativeUserID);
453        ap.setUserName(userName);
454        ap.setUserIsRequestor(requestor);
455        ap.setNetworkAccessPointID(hostName);
456        ap.setNetworkAccessPointTypeCode(AuditMessages.isIP(hostName)
457                ? AuditMessages.NetworkAccessPointTypeCode.IPAddress
458                : AuditMessages.NetworkAccessPointTypeCode.MachineName);
459        for (RoleIDCode roleID : roleIDs)
460            ap.getRoleIDCode().add(roleID);
461        return ap;
462    }
463
464    public AuditSourceIdentification createAuditSourceIdentification() {
465        AuditSourceIdentification asi = new AuditSourceIdentification();
466        asi.setAuditSourceID(auditSourceID());
467        if (auditEnterpriseSiteID != null) {
468            if (auditEnterpriseSiteID.equals("dicomInstitutionName")) {
469                String[] institutionNames = getDevice().getInstitutionNames();
470                if (institutionNames.length > 0)
471                    asi.setAuditEnterpriseSiteID(institutionNames[0]);
472            } else
473                asi.setAuditEnterpriseSiteID(auditEnterpriseSiteID);
474        }
475        for (String code : auditSourceTypeCodes) {
476            if (code.equals(DICOM_PRIMARY_DEVICE_TYPE)) {
477                for (String type : device.getPrimaryDeviceTypes()) {
478                    AuditSourceTypeCode astc = new AuditSourceTypeCode();
479                    astc.setCode(type);
480                    astc.setCodeSystemName("DCM");
481                    asi.getAuditSourceTypeCode().add(astc);
482                }
483            } else {
484                AuditSourceTypeCode astc = new AuditSourceTypeCode();
485                astc.setCode(code);
486                asi.getAuditSourceTypeCode().add(astc);
487            }
488        }
489        return asi;
490    }
491
492    public final String getMessageID() {
493        return messageID;
494    }
495
496    public final void setMessageID(String messageID) {
497        this.messageID = messageID;
498    }
499
500    public final String getEncoding() {
501        return encoding;
502    }
503
504    public final void setEncoding(String encoding) {
505        if (!Charset.isSupported(encoding))
506            throw new IllegalArgumentException(
507                    "Charset not supported: " + encoding);
508        this.encoding = encoding;
509    }
510
511    public final String getSchemaURI() {
512        return schemaURI;
513    }
514
515    public final void setSchemaURI(String schemaURI) {
516        this.schemaURI = schemaURI;
517    }
518
519    public final boolean isTimestampInUTC() {
520        return timestampInUTC;
521    }
522
523    public final void setTimestampInUTC(boolean timestampInUTC) {
524        this.timestampInUTC = timestampInUTC;
525    }
526
527    public final boolean isIncludeBOM() {
528        return includeBOM;
529    }
530
531    public final void setIncludeBOM(boolean includeBOM) {
532        this.includeBOM = includeBOM;
533    }
534
535    public final boolean isFormatXML() {
536        return formatXML;
537    }
538
539    public final void setFormatXML(boolean formatXML) {
540        this.formatXML = formatXML;
541    }
542
543    public boolean isSupplement95() {
544        return supplement95;
545    }
546
547    public void setSupplement95(boolean sup95) {
548        this.supplement95 = sup95;
549    }
550
551    public boolean isInstalled() {
552        return device != null && device.isInstalled()
553                && (auditLoggerInstalled == null || auditLoggerInstalled.booleanValue());
554    }
555
556    public final Boolean getAuditLoggerInstalled() {
557        return auditLoggerInstalled;
558    }
559
560    public void setAuditLoggerInstalled(Boolean installed) {
561        if (installed != null && installed.booleanValue()
562                && device != null && !device.isInstalled())
563            throw new IllegalStateException("owning device not installed");
564        this.auditLoggerInstalled = installed;
565    }
566
567
568    public Boolean isIncludeInstanceUID() {
569        return doIncludeInstanceUID;
570    }
571
572    public Boolean getDoIncludeInstanceUID() {
573        return doIncludeInstanceUID;
574    }
575
576    public void setDoIncludeInstanceUID(Boolean doIncludeInstanceUID) {
577        this.doIncludeInstanceUID = doIncludeInstanceUID;
578    }
579
580    /**
581     * Get spool directory into which messages failed to sent to the record
582     * repository are stored for later re-send.
583     *
584     * @return The directory in which the messages failed to sent are stored,
585     * or {@code null} if the default temporary-file directory is to
586     * be used
587     */
588    public File getSpoolDirectory() {
589        return spoolDirectory;
590    }
591
592    /**
593     * Set spool directory into which messages failed sent to the record
594     * repository are stored for later re-send.
595     *
596     * @param directory The directory in which the messages failed to sent are
597     *                  stored, or {@code null} if the default temporary-file
598     *                  directory is to be used
599     */
600    public void setSpoolDirectory(File directory) {
601        this.spoolDirectory = directory;
602    }
603
604    public String getSpoolDirectoryURI() {
605        return spoolDirectory != null ? spoolDirectory.toURI().toString() : null;
606    }
607
608    public void setSpoolDirectoryURI(String uri) {
609        this.spoolDirectory = uri != null ? new File(URI.create(uri)) : null;
610    }
611
612    public String getSpoolNameFilePrefix() {
613        return spoolFileNamePrefix;
614    }
615
616    public void setSpoolFileNamePrefix(String prefix) {
617        if (prefix.length() < 3)
618            throw new IllegalArgumentException("Spool file name prefix too short");
619        this.spoolFileNamePrefix = prefix;
620    }
621
622    public String getSpoolFileNameSuffix() {
623        return spoolFileNameSuffix;
624    }
625
626    public void setSpoolFileNameSuffix(String suffix) {
627        if (suffix.isEmpty())
628            throw new IllegalArgumentException("Spool file name suffix cannot be empty");
629        this.spoolFileNameSuffix = suffix;
630    }
631
632    /**
633     * Get interval in seconds to retry to sent messages which could not be
634     * sent to the record repository or {@code 0} if messages failed to sent
635     * are not spooled for later re-send.
636     *
637     * @return interval retry interval in seconds or {@code 0}
638     * @see #write(Calendar, AuditMessage)
639     */
640    public int getRetryInterval() {
641        return retryInterval;
642    }
643
644    /**
645     * Set interval in seconds to retry to sent messages which could not be
646     * sent to the record repository or {@code 0} if messages failed to sent
647     * are not spooled for later re-send.
648     *
649     * @param interval retry interval in seconds or {@code 0}
650     * @see #write(Calendar, AuditMessage)
651     */
652    public void setRetryInterval(int interval) {
653        this.retryInterval = interval;
654    }
655
656    public void addConnection(Connection conn) {
657        if (!conn.getProtocol().isSyslog())
658            throw new IllegalArgumentException(
659                    "Audit Logger does not support protocol " + conn.getProtocol());
660        if (device != null && device != conn.getDevice())
661            throw new IllegalStateException(conn + " not contained by " +
662                    device.getDeviceName());
663        connections.add(conn);
664    }
665
666    @Override
667    public void verifyNotUsed(Connection conn) {
668        if (connections.contains(conn))
669            throw new IllegalStateException(conn + " used by Audit Logger");
670    }
671
672    public boolean removeConnection(Connection conn) {
673        return connections.remove(conn);
674    }
675
676
677    public void setConnections(List<Connection> connections) {
678        this.connections.clear();
679        for (Connection connection : connections) addConnection(connection);
680    }
681
682    public List<Connection> getConnections() {
683        return connections;
684    }
685
686    public List<AuditSuppressCriteria> getAuditSuppressCriteriaList() {
687        return suppressAuditMessageFilters;
688    }
689
690    public AuditSuppressCriteria findAuditSuppressCriteriaByCommonName(String cn) {
691        for (AuditSuppressCriteria criteria : suppressAuditMessageFilters) {
692            if (criteria.getCommonName().equals(cn))
693                return criteria;
694        }
695        return null;
696    }
697
698    public void setAuditSuppressCriteriaList(List<AuditSuppressCriteria> filters) {
699        this.suppressAuditMessageFilters.clear();
700        this.suppressAuditMessageFilters.addAll(filters);
701    }
702
703    public void addAuditSuppressCriteria(AuditSuppressCriteria criteria) {
704        this.suppressAuditMessageFilters.add(criteria);
705    }
706
707    public void clearAllAuditSuppressCriteria() {
708        this.suppressAuditMessageFilters.clear();
709    }
710
711    /**
712     * Test if the Event Identification and the Active ActiveParticipant of an
713     * Audit Message matches one of the {@code AuditSuppressCriteria}
714     *
715     * @param msg Audit Message to test
716     * @return {@code true} the specified audit message will be suppressed;
717     * otherwise {@code false}
718     */
719    public boolean isAuditMessageSuppressed(AuditMessage msg) {
720        for (AuditSuppressCriteria criteria : suppressAuditMessageFilters) {
721            if (criteria.match(msg))
722                return true;
723        }
724        return false;
725    }
726
727    @Override
728    public void reconfigure(DeviceExtension from) {
729        reconfigure((AuditLogger) from);
730    }
731
732    private void reconfigure(AuditLogger from) {
733        setFacility(from.facility);
734        setSuccessSeverity(from.successSeverity);
735        setMinorFailureSeverity(from.minorFailureSeverity);
736        setSeriousFailureSeverity(from.seriousFailureSeverity);
737        setMajorFailureSeverity(from.majorFailureSeverity);
738        setApplicationName(from.applicationName);
739        setAuditSourceID(from.auditSourceID);
740        setAuditEnterpriseSiteID(from.auditEnterpriseSiteID);
741        setAuditSourceTypeCodes(from.auditSourceTypeCodes);
742        setMessageID(from.messageID);
743        setEncoding(from.encoding);
744        setSchemaURI(from.schemaURI);
745        setTimestampInUTC(from.timestampInUTC);
746        setIncludeBOM(from.includeBOM);
747        setFormatXML(from.formatXML);
748        setSupplement95(from.isSupplement95());
749        setSpoolDirectory(from.spoolDirectory);
750        setSpoolFileNamePrefix(from.spoolFileNamePrefix);
751        setSpoolFileNameSuffix(from.spoolFileNameSuffix);
752        setRetryInterval(from.retryInterval);
753        setAuditLoggerInstalled(from.auditLoggerInstalled);
754        setAuditRecordRepositoryDevices(from.auditRecordRepositoryDevices);
755        setAuditSuppressCriteriaList(from.suppressAuditMessageFilters);
756        device.reconfigureConnections(connections, from.connections);
757
758        closeActiveConnection();
759    }
760
761    public Calendar timeStamp() {
762        return timestampInUTC
763                ? new GregorianCalendar(TimeZone.getTimeZone("UTC"), Locale.ENGLISH)
764                : new GregorianCalendar(Locale.ENGLISH);
765    }
766
767    /**
768     * Send Audit Message by Syslog Protocol to Audit Record Repository, if the
769     * message does not match any configured {@code AuditSuppressCriteria}. If
770     * an I/O error occurs sending the message to the {@code AuditRecordRepository}
771     * and if a {@code RetryInterval) is configured, the message will be spooled
772     * into the configured {@code SpoolDirectory} for later re-send and the
773     * method returns {@code false}. If no {@code RetryInterval} is configured,
774     * the method throws an {@code IOException) if an I/O error occurs sending
775     * the message.
776     * <p/>
777     * Attention: sending via UDP without getting an I/O error does not ensure
778     * that the Audit Record Repository actually received the message!
779     *
780     * @param timeStamp included in Syslog Header
781     * @param msg       Audit Message
782     * @return {@code SendStatus.SUPPRESSED} if the message was suppressed;
783     * {@code SendStatus.SENT} if the message was successfully emitted;
784     * {@code SendStatus.QUEUED} if the message was spooled for later re-send
785     * @throws IllegalStateException           if there is no {@code AuditRecordRepository} associated with
786     *                                         this {@code AuditLogger}
787     * @throws IncompatibleConnectionException if no {@code Connection) of this {@code AuditLogger} is compatible
788     *                                         with any {@code Connection) of the associated {@code AuditRecordRepository}
789     * @throws GeneralSecurityException        if the {@link  SSLContext} could not get intialized from configured
790     *                                         private key and public certificates
791     * @throws IOException                     if an I/O error occurs sending the message to the {@code AuditRecordRepository}
792     *                                         or on spooling the message to the file system
793     */
794    public SendStatus write(Calendar timeStamp, AuditMessage msg)
795            throws IncompatibleConnectionException, GeneralSecurityException, IOException {
796        if (isAuditMessageSuppressed(msg))
797            return SendStatus.SUPPRESSED;
798
799        return sendMessage(builder().createMessage(timeStamp, msg));
800    }
801
802    public void writeAsync(Calendar timeStamp, AuditMessage msg)
803            throws IncompatibleConnectionException, GeneralSecurityException, IOException, InsufficientCapacityException {
804
805        if (isAuditMessageSuppressed(msg))
806            return;
807
808        RingBuffer<AuditMessageEvent> ringBuffer = getDisruptor(this).getRingBuffer();
809
810        long sequence = ringBuffer.next();  // Grab the next sequence
811        try
812        {
813            AuditMessageEvent msgenrtry = ringBuffer.get(sequence); // Get the entry in the Disruptor
814            // for the sequence
815            msgenrtry.setLogger(this);  // Fill with data
816            msgenrtry.setMessage(msg);
817        }
818        finally
819        {
820            ringBuffer.publish(sequence);
821        }
822    }
823
824    public SendStatus write(Calendar timeStamp, Severity severity,
825                            byte[] data, int off, int len)
826            throws IncompatibleConnectionException, GeneralSecurityException, IOException {
827        return sendMessage(
828                builder().createMessage(timeStamp, severity, data, off, len));
829    }
830
831    private MessageBuilder builder() {
832        if (builder == null)
833            builder = new MessageBuilder();
834
835        return builder;
836    }
837
838    private SendStatus sendMessage(DatagramPacket msg) throws IncompatibleConnectionException,
839            GeneralSecurityException, IOException {
840        String deviceName;
841        SendStatus status = SendStatus.SENT;
842        for (Device arrDev : auditRecordRepositoryDevices) {
843                deviceName = arrDev.getDeviceName();
844                if (getNumberOfQueuedMessages(deviceName) > 0) {
845                    spoolMessage(deviceName, msg);
846                } else {
847                    try {
848                        activeConnection(arrDev).sendMessage(msg);
849                        lastSentTimeInMillis = System.currentTimeMillis();
850                    } catch (IOException e) {
851                        lastException = e;
852                        if (retryInterval > 0) {
853                            LOG.info("Failed to send audit message:", e);
854                            spoolMessage(deviceName, msg);
855                            scheduleRetry();
856                            status = SendStatus.QUEUED;
857                        } else {
858                            throw e;
859                        }
860                    }
861                }
862        }
863        return status;
864    }
865
866    private synchronized void scheduleRetry() {
867        if (retryTimer != null || retryInterval <= 0) {
868            return;
869        }
870
871        LOG.debug("Scheduled retry in {} s", retryInterval);
872        retryTimer = getDevice().schedule(
873                new Runnable() {
874                    @Override
875                    public void run() {
876                        synchronized (AuditLogger.this) {
877                            retryTimer = null;
878                        }
879                        sendQueuedMessages();
880                    }
881                },
882                retryInterval, TimeUnit.SECONDS);
883    }
884
885    private void spoolMessage(String deviceName, DatagramPacket msg) throws IOException {
886        if (spoolDirectory != null)
887            spoolDirectory.mkdirs();
888
889        File f = null;
890        try {
891            f = File.createTempFile(spoolFileNamePrefix+DEVICE_NAME_IN_FILENAME_SEPARATOR+deviceName+DEVICE_NAME_IN_FILENAME_SEPARATOR, spoolFileNameSuffix, spoolDirectory);
892            if (spoolDirectory == null)
893                spoolDirectory = f.getParentFile();
894
895            LOG.info("Spool audit message to {}", f);
896            FileOutputStream out = new FileOutputStream(f);
897            try {
898                out.write(msg.getData(), msg.getOffset(), msg.getLength());
899            } finally {
900                SafeClose.close(out);
901            }
902            f = null;
903        } catch (IOException e) {
904            throw new IOException("Failed to spool audit message for device "+deviceName, e);
905        } finally {
906            if (f != null)
907                f.delete();
908        }
909    }
910
911    public void sendQueuedMessages() {
912        File dir = spoolDirectory;
913        if (dir == null)
914            return;
915        boolean failed = false;
916        for (final Device arrDev : this.auditRecordRepositoryDevices) {
917                try {
918                        FilenameFilter fnFilter = new FilenameFilter() {
919                                String prefix = spoolFileNamePrefix+DEVICE_NAME_IN_FILENAME_SEPARATOR+arrDev.getDeviceName()+DEVICE_NAME_IN_FILENAME_SEPARATOR;
920                        @Override
921                        public boolean accept(File dir, String name) {
922                            return name.startsWith(prefix) 
923                                        && name.endsWith(spoolFileNameSuffix);
924                        }
925                    };
926                    File[] queuedMessages = dir.listFiles(fnFilter);
927                    byte[] b = null;
928                    while (queuedMessages != null && queuedMessages.length > 0) {
929                        Arrays.sort(queuedMessages, FILE_COMPARATOR);
930                        for (File file : queuedMessages) {
931                            LOG.debug("Read audit message from {}", file);
932                            int len = (int) file.length();
933                            if (b == null || b.length < len)
934                                b = new byte[len];
935                            try {
936                                FileInputStream in = new FileInputStream(file);
937                                try {
938                                    StreamUtils.readFully(in, b, 0, len);
939                                } finally {
940                                    SafeClose.close(in);
941                                }
942                            } catch (IOException e) {
943                                LOG.warn("Failed to read audit message from {}", file, e);
944                                File dest = new File(file.getParent(), file.getName() + ".err");
945                                file.renameTo(dest);
946                                continue;
947                            }
948                        activeConnection(arrDev).sendMessage(new DatagramPacket(b, 0, len));
949                            lastSentTimeInMillis = System.currentTimeMillis();
950                            if (file.delete())
951                                LOG.debug("Delete spool file {}", file);
952                            else
953                                LOG.warn("Failed to delete spool file {}", file);
954                        }
955                        queuedMessages = dir.listFiles(FILENAME_FILTER);
956                    }
957                } catch (Exception e) {
958                    lastException = e;
959                    LOG.info("Failed to send audit message:", e);
960                    failed = true;
961                }
962        }
963        if (failed)
964                scheduleRetry();
965        synchronized (this) {
966            notify();
967        }
968    }
969
970        public Exception getLastException() {
971        return lastException;
972    }
973
974    public long getLastSentTimeInMillis() {
975        return lastSentTimeInMillis;
976    }
977
978    public int getNumberOfQueuedMessages() {
979        int tot = 0;
980        for (Device d : this.auditRecordRepositoryDevices)
981                tot += getNumberOfQueuedMessages(d.getDeviceName());
982        return tot;
983    }
984    
985    public int getNumberOfQueuedMessages(final String deviceName) {
986        try {
987                FilenameFilter fnFilter = new FilenameFilter() {
988                        String prefix = spoolFileNamePrefix+DEVICE_NAME_IN_FILENAME_SEPARATOR+deviceName+DEVICE_NAME_IN_FILENAME_SEPARATOR;
989                @Override
990                public boolean accept(File dir, String name) {
991                    return name.startsWith(prefix) 
992                                && name.endsWith(spoolFileNameSuffix);
993                }
994            };
995
996            return spoolDirectory.list(fnFilter).length;
997        } catch (NullPointerException e) {
998            return 0;
999        }
1000    }
1001
1002    public File[] getQueuedMessages() {
1003        try {
1004            return spoolDirectory.listFiles(FILENAME_FILTER);
1005        } catch (NullPointerException e) {
1006            return null;
1007        }
1008    }
1009
1010    public synchronized void waitForNoQueuedMessages(long timeout)
1011            throws InterruptedException {
1012        int count;
1013        for (Device arrDev : this.auditRecordRepositoryDevices) {
1014                while ( (count = getNumberOfQueuedMessages(arrDev.getDeviceName())) > 0) {
1015                        LOG.debug("Wait for {} queued Audit Messages for AuditRepository {}!", count, arrDev.getDeviceName());
1016                        wait(timeout);
1017                }
1018        }
1019    }
1020
1021    public synchronized void closeActiveConnection() {
1022        for (Map.Entry<String, ActiveConnection> entry : activeConnection.entrySet()) {
1023            try {
1024                entry.getValue().close();
1025            } catch (IOException e) {
1026                LOG.error("Failed to close active connection to {}", entry.getKey(), e);
1027                throw new AssertionError(e);
1028            }
1029            this.activeConnection.clear();
1030        }
1031    }
1032
1033    private synchronized ActiveConnection activeConnection(Device arrDev)
1034            throws IncompatibleConnectionException {
1035        ActiveConnection activeConnection = this.activeConnection.get(arrDev.getDeviceName());
1036        if (activeConnection != null)
1037            return activeConnection;
1038
1039        AuditRecordRepository arr = arrDev.getDeviceExtension(AuditRecordRepository.class);
1040        if (arr == null)
1041            throw new IllegalStateException("AuditRecordRepositoryDevice "
1042                    + arrDev.getDeviceName()
1043                    + " does not provide Audit Record Repository");
1044
1045        for (Connection remoteConn : arr.getConnections())
1046            if (remoteConn.isInstalled() && remoteConn.isServer())
1047                for (Connection conn : connections)
1048                    if (conn.isInstalled() && conn.isCompatible(remoteConn)) {
1049                        activeConnection = conn.getProtocol().isTCP()
1050                                        ? new TCPConnection(conn, remoteConn)
1051                                        : new UDPConnection(conn, remoteConn);
1052                        this.activeConnection.put(arrDev.getDeviceName(), activeConnection);
1053                        return activeConnection;
1054                    }
1055        throw new IncompatibleConnectionException(
1056                "No compatible connection to " + arr + " available on " + this);
1057    }
1058
1059        public static String processID() {
1060        String s = ManagementFactory.getRuntimeMXBean().getName();
1061        int atPos = s.indexOf('@');
1062        return atPos > 0 ? s.substring(0, atPos)
1063                : Integer.toString(new Random().nextInt() & 0x7fffffff);
1064    }
1065
1066    public static InetAddress localHost() {
1067        try {
1068            return InetAddress.getLocalHost();
1069        } catch (UnknownHostException e) {
1070            return null;
1071        }
1072    }
1073
1074    private Severity severityOf(AuditMessage msg) {
1075        String eventOutcomeIndicator = msg.getEventIdentification()
1076                .getEventOutcomeIndicator();
1077        if (eventOutcomeIndicator.length() == 1)
1078            switch (eventOutcomeIndicator.charAt(0)) {
1079                case '0':
1080                    return successSeverity;
1081                case '4':
1082                    return minorFailureSeverity;
1083                case '8':
1084                    return seriousFailureSeverity;
1085            }
1086        else if (eventOutcomeIndicator.equals("12"))
1087            return majorFailureSeverity;
1088
1089        throw new IllegalArgumentException(
1090                "Illegal eventOutcomeIndicator: " + eventOutcomeIndicator);
1091    }
1092
1093    private int prival(Severity severity) {
1094        return (facility.ordinal() << 3) | severity.ordinal();
1095    }
1096
1097    public static AuditLogger getDefaultLogger() {
1098        return defaultLogger;
1099    }
1100
1101    public static void setDefaultLogger(AuditLogger defaultLogger) {
1102        AuditLogger.defaultLogger = defaultLogger;
1103    }
1104
1105    private class MessageBuilder extends ByteArrayOutputStream {
1106
1107        DatagramPacket createMessage(Calendar timeStamp, AuditMessage msg) {
1108            try {
1109                reset();
1110                writeHeader(severityOf(msg), timeStamp);
1111                if (!supplement95) {
1112                    AuditMessages.toXML(msg, builder, formatXML, encoding, schemaURI);
1113                } else {
1114                    AuditMessages.toSupplement95XML(msg, builder, formatXML, encoding, schemaURI);
1115                }
1116            } catch (IOException e) {
1117                throw new RuntimeException(e);
1118            }
1119            return new DatagramPacket(buf, 0, count);
1120        }
1121
1122        DatagramPacket createMessage(Calendar timeStamp, Severity severity,
1123                                     byte[] data, int off, int len) {
1124            try {
1125                reset();
1126                writeHeader(severity, timeStamp);
1127                write(data, off, len);
1128            } catch (IOException e) {
1129                throw new RuntimeException(e);
1130            }
1131            return new DatagramPacket(buf, 0, count);
1132        }
1133
1134        void writeHeader(Severity severity, Calendar timeStamp)
1135                throws IOException {
1136            write('<');
1137            writeInt(prival(severity));
1138            write('>');
1139            write(SYSLOG_VERSION);
1140            write(' ');
1141            write(timeStamp);
1142            write(' ');
1143            if (localHost != null)
1144                write(localHost.getCanonicalHostName().getBytes(encoding));
1145            else
1146                write('-');
1147            write(' ');
1148            write(applicationName().getBytes(encoding));
1149            write(' ');
1150            write(processID.getBytes(encoding));
1151            write(' ');
1152            if (messageID != null)
1153                write(messageID.getBytes(encoding));
1154            else
1155                write('-');
1156            write(' ');
1157            write('-');
1158            write(' ');
1159            if (includeBOM && encoding.equals("UTF-8"))
1160                write(BOM);
1161        }
1162
1163        void writeInt(int i) {
1164            if (i >= 100)
1165                writeNNN(i);
1166            else if (i >= 10)
1167                writeNN(i);
1168            else
1169                writeN(i);
1170        }
1171
1172        void write(Calendar timeStamp) {
1173            writeNNNN(timeStamp.get(Calendar.YEAR));
1174            write('-');
1175            writeNN(timeStamp.get(Calendar.MONTH) + 1);
1176            write('-');
1177            writeNN(timeStamp.get(Calendar.DAY_OF_MONTH));
1178            write('T');
1179            writeNN(timeStamp.get(Calendar.HOUR_OF_DAY));
1180            write(':');
1181            writeNN(timeStamp.get(Calendar.MINUTE));
1182            write(':');
1183            writeNN(timeStamp.get(Calendar.SECOND));
1184            write('.');
1185            writeNNN(timeStamp.get(Calendar.MILLISECOND));
1186            int tzOffset = timeStamp.get(Calendar.ZONE_OFFSET)
1187                    + timeStamp.get(Calendar.DST_OFFSET);
1188            if (tzOffset == 0)
1189                write('Z');
1190            else {
1191                tzOffset /= 60000;
1192                if (tzOffset > 0)
1193                    write('+');
1194                else {
1195                    write('-');
1196                    tzOffset = -tzOffset;
1197                }
1198                writeNN(tzOffset / 60);
1199                write(':');
1200                writeNN(tzOffset % 60);
1201            }
1202        }
1203
1204        void writeNNNN(int i) {
1205            writeNN(i / 100);
1206            writeNN(i % 100);
1207        }
1208
1209        void writeNNN(int i) {
1210            writeN(i / 100);
1211            writeNN(i % 100);
1212        }
1213
1214        void writeNN(int i) {
1215            write(DIGITS_X0[i]);
1216            write(DIGITS_0X[i]);
1217        }
1218
1219        void writeN(int i) {
1220            write(DIGITS_0X[i]);
1221        }
1222
1223    }
1224
1225    private static String toString(DatagramPacket packet) {
1226        try {
1227            int len = packet.getLength();
1228            boolean truncate = len > MSG_PROMPT_LEN;
1229            String s = new String(packet.getData(), 0,
1230                    truncate ? MSG_PROMPT_LEN : len, "UTF-8");
1231            if (truncate)
1232                s += "...";
1233            return s;
1234        } catch (UnsupportedEncodingException e) {
1235            throw new RuntimeException(e);
1236        }
1237    }
1238
1239    private abstract class ActiveConnection implements Closeable {
1240        final Connection conn;
1241        final Connection remoteConn;
1242
1243        ActiveConnection(Connection conn, Connection remoteConn) {
1244            this.conn = conn;
1245            this.remoteConn = remoteConn;
1246        }
1247
1248        abstract void sendMessage(DatagramPacket msg) throws IOException,
1249                IncompatibleConnectionException, GeneralSecurityException;
1250
1251    }
1252
1253    private class UDPConnection extends ActiveConnection {
1254        DatagramSocket ds;
1255
1256        UDPConnection(Connection conn, Connection remoteConn) {
1257            super(conn, remoteConn);
1258        }
1259
1260        @Override
1261        void sendMessage(DatagramPacket msg) throws IOException {
1262
1263            if (ds == null)
1264                ds = conn.createDatagramSocket();
1265
1266            InetSocketAddress endPoint = remoteConn.getEndPoint();
1267            LOG.info("Send audit message to {}", endPoint);
1268            if (LOG.isDebugEnabled())
1269                LOG.debug(AuditLogger.toString(msg));
1270            msg.setSocketAddress(endPoint);
1271            ds.send(msg);
1272        }
1273
1274        @Override
1275        public void close() {
1276            if (ds != null) {
1277                ds.close();
1278                ds = null;
1279            }
1280        }
1281
1282    }
1283
1284    private class TCPConnection extends ActiveConnection {
1285        Socket sock;
1286        OutputStream out;
1287        ScheduledFuture<?> idleTimer;
1288
1289        TCPConnection(Connection conn, Connection remoteConn) {
1290            super(conn, remoteConn);
1291        }
1292
1293        void connect() throws IOException,
1294                IncompatibleConnectionException, GeneralSecurityException {
1295            if (sock == null) {
1296                sock = conn.connect(remoteConn);
1297                out = sock.getOutputStream();
1298            }
1299        }
1300
1301        @Override
1302        synchronized void sendMessage(DatagramPacket packet) throws IOException,
1303                IncompatibleConnectionException, GeneralSecurityException {
1304            stopIdleTimer();
1305            connect();
1306            try {
1307                trySendMessage(packet);
1308            } catch (IOException e) {
1309                LOG.info("Failed to send audit message to {} - reconnect",
1310                        sock, e);
1311                close();
1312                connect();
1313                trySendMessage(packet);
1314            }
1315            startIdleTimer();
1316        }
1317
1318        void trySendMessage(DatagramPacket packet) throws IOException {
1319            LOG.info("Send audit message to {}", sock);
1320            if (LOG.isDebugEnabled())
1321                LOG.debug(AuditLogger.toString(packet));
1322            out.write(Integer.toString(packet.getLength()).getBytes(encoding));
1323            out.write(' ');
1324            out.write(packet.getData(), packet.getOffset(), packet.getLength());
1325            out.flush();
1326        }
1327
1328        private void startIdleTimer() {
1329            int idleTimeout = conn.getIdleTimeout();
1330            if (idleTimeout > 0) {
1331                LOG.debug("Start Idle timeout of {} ms for {}", idleTimeout, sock);
1332                try {
1333                    idleTimer = conn.getDevice().schedule(
1334                            new Runnable() {
1335                                @Override
1336                                public void run() {
1337                                    onIdleTimerExpired();
1338                                }
1339                            },
1340                            idleTimeout,
1341                            TimeUnit.MILLISECONDS);
1342                } catch (Exception e) {
1343                    LOG.warn("Failed to start Idle timeout", e);
1344                }
1345            }
1346        }
1347
1348        private void stopIdleTimer() {
1349            if (idleTimer != null) {
1350                LOG.debug("Stop Idle timer for {}", sock);
1351                idleTimer.cancel(false);
1352                idleTimer = null;
1353            }
1354        }
1355
1356        @Override
1357        public synchronized void close() {
1358            stopIdleTimer();
1359            closeSocket();
1360        }
1361
1362        private void closeSocket() {
1363            if (sock != null)
1364                conn.close(sock);
1365            sock = null;
1366            out = null;
1367        }
1368
1369        private void onIdleTimerExpired() {
1370            ScheduledFuture<?> expiredIdleTimer = idleTimer;
1371            synchronized (this) {
1372                if (expiredIdleTimer != idleTimer) {
1373                    LOG.debug("Detect restart of Idle timer for {}", sock);
1374                } else {
1375                    LOG.info("Idle timeout for {} expired", sock);
1376                    idleTimer = null;
1377                    closeSocket();
1378                }
1379            }
1380        }
1381
1382    }
1383
1384    public static Disruptor<AuditMessageEvent> getDisruptor(AuditLogger logger) {
1385        if (disruptor == null)
1386            disruptor = initializeDisruptor(logger);
1387
1388        return disruptor;
1389    }
1390
1391    private static Disruptor<AuditMessageEvent> initializeDisruptor(AuditLogger logger) {
1392        // Executor that will be used to construct new threads for consumers
1393        Executor executor = Executors.newCachedThreadPool();
1394
1395        // The factory for the event
1396        AuditMessageEventFactory factory = new AuditMessageEventFactory();
1397
1398        // Specify the size of the ring buffer, must be power of 2.
1399        int bufferSize = 8;
1400
1401        Disruptor<AuditMessageEvent> disruptorInstance = new Disruptor<AuditMessageEvent>(factory, bufferSize, executor);
1402
1403        // Connect the handler
1404        disruptorInstance.handleEventsWith(new AuditMessageEventHandler());
1405
1406        // Start the Disruptor, starts all threads running
1407        disruptorInstance.start();
1408
1409        return disruptorInstance;
1410    }
1411}