001/* ***** BEGIN LICENSE BLOCK ***** 002 * Version: MPL 1.1/GPL 2.0/LGPL 2.1 003 * 004 * The contents of this file are subject to the Mozilla Public License Version 005 * 1.1 (the "License"); you may not use this file except in compliance with 006 * the License. You may obtain a copy of the License at 007 * http://www.mozilla.org/MPL/ 008 * 009 * Software distributed under the License is distributed on an "AS IS" basis, 010 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 011 * for the specific language governing rights and limitations under the 012 * License. 013 * 014 * The Original Code is part of dcm4che, an implementation of DICOM(TM) in 015 * Java(TM), hosted at https://github.com/gunterze/dcm4che3. 016 * 017 * The Initial Developer of the Original Code is 018 * Agfa Healthcare. 019 * Portions created by the Initial Developer are Copyright (C) 2012 020 * the Initial Developer. All Rights Reserved. 021 * 022 * Contributor(s): 023 * See @authors listed below 024 * 025 * Alternatively, the contents of this file may be used under the terms of 026 * either the GNU General Public License Version 2 or later (the "GPL"), or 027 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 028 * in which case the provisions of the GPL or the LGPL are applicable instead 029 * of those above. If you wish to allow use of your version of this file only 030 * under the terms of either the GPL or the LGPL, and not to allow others to 031 * use your version of this file under the terms of the MPL, indicate your 032 * decision by deleting the provisions above and replace them with the notice 033 * and other provisions required by the GPL or the LGPL. If you do not delete 034 * the provisions above, a recipient may use your version of this file under 035 * the terms of any one of the MPL, the GPL or the LGPL. 036 * 037 * ***** END LICENSE BLOCK ***** */ 038 039package org.dcm4che3.net.audit; 040 041import com.lmax.disruptor.*; 042import com.lmax.disruptor.dsl.Disruptor; 043import com.lmax.disruptor.dsl.ProducerType; 044import org.dcm4che3.audit.*; 045import org.dcm4che3.audit.AuditMessages.RoleIDCode; 046import org.dcm4che3.audit.AuditMessage; 047import org.dcm4che3.conf.core.api.ConfigurableClass; 048import org.dcm4che3.conf.core.api.ConfigurableProperty; 049import org.dcm4che3.conf.core.api.LDAP; 050import org.dcm4che3.net.Connection; 051import org.dcm4che3.net.Device; 052import org.dcm4che3.net.DeviceExtension; 053import org.dcm4che3.net.IncompatibleConnectionException; 054import org.dcm4che3.util.SafeClose; 055import org.dcm4che3.util.StreamUtils; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import javax.net.ssl.SSLContext; 060 061import java.io.*; 062import java.lang.management.ManagementFactory; 063import java.net.*; 064import java.nio.charset.Charset; 065import java.security.GeneralSecurityException; 066import java.util.*; 067import java.util.concurrent.Executor; 068import java.util.concurrent.Executors; 069import java.util.concurrent.ScheduledFuture; 070import java.util.concurrent.TimeUnit; 071 072/** 073 * @author Gunter Zeilinger <gunterze@gmail.com> 074 * @author Michael Backhaus <michael.backhaus@agfa.com> 075 */ 076@LDAP(objectClasses = "dcmAuditLogger") 077@ConfigurableClass 078public class AuditLogger extends DeviceExtension { 079 080 private static final String DICOM_PRIMARY_DEVICE_TYPE = "dicomPrimaryDeviceType"; 081 082 private static final String DEVICE_NAME_IN_FILENAME_SEPARATOR = "-._"; 083 084 private static Disruptor<AuditMessageEvent> disruptor; 085 086 public enum SendStatus { 087 SENT, QUEUED, SUPPRESSED 088 } 089 090 private static final long serialVersionUID = 1595714214186063103L; 091 092 private static final int MSG_PROMPT_LEN = 8192; 093 094 private static Logger LOG = LoggerFactory.getLogger(AuditLogger.class); 095 096 public enum Facility { 097 kern, // (0) -- kernel messages 098 user, // (1) -- user-level messages 099 mail, // (2) -- mail system messages 100 daemon, // (3) -- system daemons' messages 101 auth, // (4) -- authorization messages 102 syslog, // (5) -- messages generated internally by syslogd 103 lpr, // (6) -- line printer subsystem messages 104 news, // (7) -- network news subsystem messages 105 uucp, // (8) -- UUCP subsystem messages 106 cron, // (9) -- clock daemon messages 107 authpriv, // (10)-- security/authorization messages 108 ftp, // (11)-- ftp daemon messages 109 ntp, // (12)-- NTP subsystem messages 110 audit, // (13)-- audit messages 111 console, // (14)-- console messages 112 cron2, // (15)-- clock daemon messages 113 local0, // (16) 114 local1, // (17) 115 local2, // (18) 116 local3, // (19) 117 local4, // (20) 118 local5, // (21) 119 local6, // (22) 120 local7, // (23) 121 } 122 123 public enum Severity { 124 emerg, // (0) -- emergency; system is unusable 125 alert, // (1) -- action must be taken immediately 126 crit, // (2) -- critical condition 127 err, // (3) -- error condition 128 warning, // (4) -- warning condition 129 notice, // (5) -- normal but significant condition 130 info, // (6) -- informational message 131 debug // (7) -- debug-level messages 132 } 133 134 public static final String MESSAGE_ID = "IHE+RFC-3881"; 135 136 private static final int[] DIGITS_0X = { 137 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 138 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 139 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 140 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 141 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 142 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 143 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 144 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 145 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 146 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 147 }; 148 private static final int[] DIGITS_X0 = { 149 '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', 150 '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', 151 '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', 152 '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', 153 '4', '4', '4', '4', '4', '4', '4', '4', '4', '4', 154 '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', 155 '6', '6', '6', '6', '6', '6', '6', '6', '6', '6', 156 '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', 157 '8', '8', '8', '8', '8', '8', '8', '8', '8', '8', 158 '9', '9', '9', '9', '9', '9', '9', '9', '9', '9', 159 }; 160 private static final byte[] BOM = {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; 161 private static final char SYSLOG_VERSION = '1'; 162 private static final InetAddress localHost = localHost(); 163 private static final String processID = processID(); 164 private static final Comparator<File> FILE_COMPARATOR = new Comparator<File>() { 165 @Override 166 public int compare(File o1, File o2) { 167 long diff = o1.lastModified() - o2.lastModified(); 168 return diff < 0 ? -1 : diff > 0 ? 1 : 0; 169 } 170 }; 171 private static volatile AuditLogger defaultLogger; 172 173 @ConfigurableProperty(name = "dcmAuditRecordRepositoryDeviceReference", 174 label = "ARR Device", 175 description = "Devices that correspond to Audit Record Repositories to which audit messages are sent", 176 tags = ConfigurableProperty.Tag.PRIMARY, 177 collectionOfReferences = true) 178 private List<Device> auditRecordRepositoryDevices = new ArrayList<Device>(); 179 180 @ConfigurableProperty( 181 name = "dcmAuditFacility", 182 enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL, 183 defaultValue = "10") 184 private Facility facility = Facility.authpriv; 185 186 @ConfigurableProperty( 187 name = "dcmAuditSuccessSeverity", 188 enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL, 189 defaultValue = "5" 190 ) 191 private Severity successSeverity = Severity.notice; 192 193 @ConfigurableProperty( 194 name = "dcmAuditMinorFailureSeverity", 195 enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL, 196 defaultValue = "4" 197 ) 198 private Severity minorFailureSeverity = Severity.warning; 199 200 @ConfigurableProperty( 201 name = "dcmAuditSeriousFailureSeverity", 202 enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL, 203 defaultValue = "3" 204 ) 205 private Severity seriousFailureSeverity = Severity.err; 206 207 @ConfigurableProperty( 208 name = "dcmAuditMajorFailureSeverity", 209 enumRepresentation = ConfigurableProperty.EnumRepresentation.ORDINAL, 210 defaultValue = "2" 211 ) 212 private Severity majorFailureSeverity = Severity.crit; 213 214 215 @ConfigurableProperty(name = "dcmAuditApplicationName") 216 private String applicationName; 217 218 @ConfigurableProperty(name = "dcmAuditSourceID") 219 private String auditSourceID; 220 221 @ConfigurableProperty(name = "dcmAuditEnterpriseSiteID") 222 private String auditEnterpriseSiteID; 223 224 @ConfigurableProperty(name = "dcmAuditSourceTypeCode") 225 private String[] auditSourceTypeCodes = {}; 226 227 @ConfigurableProperty(name = "dcmAuditMessageID", defaultValue = MESSAGE_ID) 228 private String messageID = MESSAGE_ID; 229 230 @ConfigurableProperty(name = "dcmAuditMessageEncoding", defaultValue = "UTF-8") 231 private String encoding = "UTF-8"; 232 233 @ConfigurableProperty(name = "dcmAuditMessageSchemaURI", defaultValue =AuditMessages.SCHEMA_URI ) 234 private String schemaURI = AuditMessages.SCHEMA_URI; 235 236 @ConfigurableProperty(name = "dcmAuditTimestampInUTC", defaultValue = "false") 237 private boolean timestampInUTC = false; 238 239 @ConfigurableProperty(name = "dcmAuditMessageBOM", defaultValue = "true") 240 private boolean includeBOM = true; 241 242 @ConfigurableProperty(name = "dcmAuditMessageFormatXML", defaultValue = "false") 243 private boolean formatXML; 244 245 @ConfigurableProperty(name = "dcmAuditMessageSupplement95Schema", defaultValue = "false") 246 private boolean supplement95; 247 248 @ConfigurableProperty(name = "dicomInstalled") 249 private Boolean auditLoggerInstalled; 250 251 @ConfigurableProperty(name = "dcmAuditIncludeInstanceUID") 252 private Boolean doIncludeInstanceUID = false; 253 254 @ConfigurableProperty(name = "dcmAuditLoggerSpoolDirectoryURI") 255 private String spoolDirectoryURI; 256 257 private File spoolDirectory; 258 259 private String spoolFileNamePrefix = "audit"; 260 private String spoolFileNameSuffix = ".log"; 261 262 @ConfigurableProperty(name = "dcmAuditLoggerRetryInterval", defaultValue = "0") 263 private int retryInterval; 264 265 @LDAP( 266 noContainerNode = true, 267 distinguishingField = "cn" 268 ) 269 @ConfigurableProperty(name = "dcmAuditSuppressCriteria") 270 private final List<AuditSuppressCriteria> suppressAuditMessageFilters = 271 new ArrayList<AuditSuppressCriteria>(0); 272 273 @ConfigurableProperty(name = "dicomNetworkConnectionReference", 274 label = "Connections", 275 description = "Connections that can be used to send audit messages", 276 tags = ConfigurableProperty.Tag.PRIMARY, 277 collectionOfReferences = true) 278 private List<Connection> connections = new ArrayList<Connection>(1); 279 280 private transient MessageBuilder builder; 281 private transient Map<String,ActiveConnection> activeConnection = new HashMap<String, ActiveConnection>(); 282 private transient ScheduledFuture<?> retryTimer; 283 private transient Exception lastException; 284 private transient long lastSentTimeInMillis; 285 private transient final FilenameFilter FILENAME_FILTER = new FilenameFilter() { 286 @Override 287 public boolean accept(File dir, String name) { 288 return name.startsWith(spoolFileNamePrefix) && name.endsWith(spoolFileNameSuffix); 289 } 290 }; 291 292 293 public List<AuditSuppressCriteria> getSuppressAuditMessageFilters() { 294 return suppressAuditMessageFilters; 295 } 296 297 public void setSuppressAuditMessageFilters(List<AuditSuppressCriteria> suppressAuditMessageFilters) { 298 this.suppressAuditMessageFilters.clear(); 299 for (AuditSuppressCriteria filter : suppressAuditMessageFilters) this.suppressAuditMessageFilters.add(filter); 300 } 301 302 public final List<Device> getAuditRecordRepositoryDevices() { 303 return auditRecordRepositoryDevices; 304 } 305 306 public List<String> getAuditRecordRepositoryDeviceNames() { 307 if (auditRecordRepositoryDevices == null) 308 throw new IllegalStateException("AuditRecordRepositoryDevice not initalized"); 309 List<String> names = new ArrayList<String>(auditRecordRepositoryDevices.size()); 310 for (Device d : auditRecordRepositoryDevices) { 311 names.add(d.getDeviceName()); 312 } 313 return names; 314 } 315 316 public void setAuditRecordRepositoryDevices(List<Device> arrDevices) { 317 for (ActiveConnection c : activeConnection.values()) 318 SafeClose.close(c); 319 activeConnection.clear(); 320 if(arrDevices==null){ 321 this.auditRecordRepositoryDevices.clear(); 322 }else{ 323 this.auditRecordRepositoryDevices = arrDevices; 324 } 325 } 326 327 public void addAuditRecordRepositoryDevice(Device device) { 328 auditRecordRepositoryDevices.add(device); 329 } 330 331 public boolean removeAuditRecordRepositoryDevice(Device device) { 332 return auditRecordRepositoryDevices.remove(device); 333 } 334 335 public final Facility getFacility() { 336 return facility; 337 } 338 339 public final void setFacility(Facility facility) { 340 if (facility == null) 341 throw new NullPointerException(); 342 this.facility = facility; 343 } 344 345 public final Severity getSuccessSeverity() { 346 return successSeverity; 347 } 348 349 public final void setSuccessSeverity(Severity severity) { 350 if (severity == null) 351 throw new NullPointerException(); 352 this.successSeverity = severity; 353 } 354 355 public final Severity getMinorFailureSeverity() { 356 return minorFailureSeverity; 357 } 358 359 public final void setMinorFailureSeverity(Severity severity) { 360 if (severity == null) 361 throw new NullPointerException(); 362 this.minorFailureSeverity = severity; 363 } 364 365 public final Severity getSeriousFailureSeverity() { 366 return seriousFailureSeverity; 367 } 368 369 public final void setSeriousFailureSeverity(Severity severity) { 370 if (severity == null) 371 throw new NullPointerException(); 372 this.seriousFailureSeverity = severity; 373 } 374 375 public final Severity getMajorFailureSeverity() { 376 return majorFailureSeverity; 377 } 378 379 public final void setMajorFailureSeverity(Severity severity) { 380 if (severity == null) 381 throw new NullPointerException(); 382 this.majorFailureSeverity = severity; 383 } 384 385 public final String getApplicationName() { 386 return applicationName; 387 } 388 389 private String applicationName() { 390 return applicationName != null 391 ? applicationName 392 : auditSourceID(); 393 } 394 395 public final void setApplicationName(String applicationName) { 396 this.applicationName = applicationName; 397 } 398 399 public final String getAuditSourceID() { 400 return auditSourceID; 401 } 402 403 public final void setAuditSourceID(String auditSourceID) { 404 this.auditSourceID = auditSourceID; 405 } 406 407 private String auditSourceID() { 408 return auditSourceID != null 409 ? auditSourceID 410 : getDevice().getDeviceName(); 411 } 412 413 public final String getAuditEnterpriseSiteID() { 414 return auditEnterpriseSiteID; 415 } 416 417 public final void setAuditEnterpriseSiteID(String auditEnterpriseSiteID) { 418 this.auditEnterpriseSiteID = auditEnterpriseSiteID; 419 } 420 421 public String[] getAuditSourceTypeCodes() { 422 return auditSourceTypeCodes; 423 } 424 425 public void setAuditSourceTypeCodes(String... auditSourceTypeCode) { 426 this.auditSourceTypeCodes = auditSourceTypeCode; 427 } 428 429 public ActiveParticipant createActiveParticipant( 430 boolean requestor, RoleIDCode... roleIDs) { 431 432 Collection<String> aets = device.getApplicationAETitles(); 433 434 return createActiveParticipant(requestor, 435 processID(), 436 AuditMessages.alternativeUserIDForAETitle( 437 aets.toArray(new String[aets.size()])), 438 applicationName(), 439 localHost().getHostName(), 440 roleIDs); 441 } 442 443 public ActiveParticipant createActiveParticipant( 444 boolean requestor, 445 String userID, 446 String alternativeUserID, 447 String userName, 448 String hostName, 449 RoleIDCode... roleIDs) { 450 ActiveParticipant ap = new ActiveParticipant(); 451 ap.setUserID(userID); 452 ap.setAlternativeUserID(alternativeUserID); 453 ap.setUserName(userName); 454 ap.setUserIsRequestor(requestor); 455 ap.setNetworkAccessPointID(hostName); 456 ap.setNetworkAccessPointTypeCode(AuditMessages.isIP(hostName) 457 ? AuditMessages.NetworkAccessPointTypeCode.IPAddress 458 : AuditMessages.NetworkAccessPointTypeCode.MachineName); 459 for (RoleIDCode roleID : roleIDs) 460 ap.getRoleIDCode().add(roleID); 461 return ap; 462 } 463 464 public AuditSourceIdentification createAuditSourceIdentification() { 465 AuditSourceIdentification asi = new AuditSourceIdentification(); 466 asi.setAuditSourceID(auditSourceID()); 467 if (auditEnterpriseSiteID != null) { 468 if (auditEnterpriseSiteID.equals("dicomInstitutionName")) { 469 String[] institutionNames = getDevice().getInstitutionNames(); 470 if (institutionNames.length > 0) 471 asi.setAuditEnterpriseSiteID(institutionNames[0]); 472 } else 473 asi.setAuditEnterpriseSiteID(auditEnterpriseSiteID); 474 } 475 for (String code : auditSourceTypeCodes) { 476 if (code.equals(DICOM_PRIMARY_DEVICE_TYPE)) { 477 for (String type : device.getPrimaryDeviceTypes()) { 478 AuditSourceTypeCode astc = new AuditSourceTypeCode(); 479 astc.setCode(type); 480 astc.setCodeSystemName("DCM"); 481 asi.getAuditSourceTypeCode().add(astc); 482 } 483 } else { 484 AuditSourceTypeCode astc = new AuditSourceTypeCode(); 485 astc.setCode(code); 486 asi.getAuditSourceTypeCode().add(astc); 487 } 488 } 489 return asi; 490 } 491 492 public final String getMessageID() { 493 return messageID; 494 } 495 496 public final void setMessageID(String messageID) { 497 this.messageID = messageID; 498 } 499 500 public final String getEncoding() { 501 return encoding; 502 } 503 504 public final void setEncoding(String encoding) { 505 if (!Charset.isSupported(encoding)) 506 throw new IllegalArgumentException( 507 "Charset not supported: " + encoding); 508 this.encoding = encoding; 509 } 510 511 public final String getSchemaURI() { 512 return schemaURI; 513 } 514 515 public final void setSchemaURI(String schemaURI) { 516 this.schemaURI = schemaURI; 517 } 518 519 public final boolean isTimestampInUTC() { 520 return timestampInUTC; 521 } 522 523 public final void setTimestampInUTC(boolean timestampInUTC) { 524 this.timestampInUTC = timestampInUTC; 525 } 526 527 public final boolean isIncludeBOM() { 528 return includeBOM; 529 } 530 531 public final void setIncludeBOM(boolean includeBOM) { 532 this.includeBOM = includeBOM; 533 } 534 535 public final boolean isFormatXML() { 536 return formatXML; 537 } 538 539 public final void setFormatXML(boolean formatXML) { 540 this.formatXML = formatXML; 541 } 542 543 public boolean isSupplement95() { 544 return supplement95; 545 } 546 547 public void setSupplement95(boolean sup95) { 548 this.supplement95 = sup95; 549 } 550 551 public boolean isInstalled() { 552 return device != null && device.isInstalled() 553 && (auditLoggerInstalled == null || auditLoggerInstalled.booleanValue()); 554 } 555 556 public final Boolean getAuditLoggerInstalled() { 557 return auditLoggerInstalled; 558 } 559 560 public void setAuditLoggerInstalled(Boolean installed) { 561 if (installed != null && installed.booleanValue() 562 && device != null && !device.isInstalled()) 563 throw new IllegalStateException("owning device not installed"); 564 this.auditLoggerInstalled = installed; 565 } 566 567 568 public Boolean isIncludeInstanceUID() { 569 return doIncludeInstanceUID; 570 } 571 572 public Boolean getDoIncludeInstanceUID() { 573 return doIncludeInstanceUID; 574 } 575 576 public void setDoIncludeInstanceUID(Boolean doIncludeInstanceUID) { 577 this.doIncludeInstanceUID = doIncludeInstanceUID; 578 } 579 580 /** 581 * Get spool directory into which messages failed to sent to the record 582 * repository are stored for later re-send. 583 * 584 * @return The directory in which the messages failed to sent are stored, 585 * or {@code null} if the default temporary-file directory is to 586 * be used 587 */ 588 public File getSpoolDirectory() { 589 return spoolDirectory; 590 } 591 592 /** 593 * Set spool directory into which messages failed sent to the record 594 * repository are stored for later re-send. 595 * 596 * @param directory The directory in which the messages failed to sent are 597 * stored, or {@code null} if the default temporary-file 598 * directory is to be used 599 */ 600 public void setSpoolDirectory(File directory) { 601 this.spoolDirectory = directory; 602 } 603 604 public String getSpoolDirectoryURI() { 605 return spoolDirectory != null ? spoolDirectory.toURI().toString() : null; 606 } 607 608 public void setSpoolDirectoryURI(String uri) { 609 this.spoolDirectory = uri != null ? new File(URI.create(uri)) : null; 610 } 611 612 public String getSpoolNameFilePrefix() { 613 return spoolFileNamePrefix; 614 } 615 616 public void setSpoolFileNamePrefix(String prefix) { 617 if (prefix.length() < 3) 618 throw new IllegalArgumentException("Spool file name prefix too short"); 619 this.spoolFileNamePrefix = prefix; 620 } 621 622 public String getSpoolFileNameSuffix() { 623 return spoolFileNameSuffix; 624 } 625 626 public void setSpoolFileNameSuffix(String suffix) { 627 if (suffix.isEmpty()) 628 throw new IllegalArgumentException("Spool file name suffix cannot be empty"); 629 this.spoolFileNameSuffix = suffix; 630 } 631 632 /** 633 * Get interval in seconds to retry to sent messages which could not be 634 * sent to the record repository or {@code 0} if messages failed to sent 635 * are not spooled for later re-send. 636 * 637 * @return interval retry interval in seconds or {@code 0} 638 * @see #write(Calendar, AuditMessage) 639 */ 640 public int getRetryInterval() { 641 return retryInterval; 642 } 643 644 /** 645 * Set interval in seconds to retry to sent messages which could not be 646 * sent to the record repository or {@code 0} if messages failed to sent 647 * are not spooled for later re-send. 648 * 649 * @param interval retry interval in seconds or {@code 0} 650 * @see #write(Calendar, AuditMessage) 651 */ 652 public void setRetryInterval(int interval) { 653 this.retryInterval = interval; 654 } 655 656 public void addConnection(Connection conn) { 657 if (!conn.getProtocol().isSyslog()) 658 throw new IllegalArgumentException( 659 "Audit Logger does not support protocol " + conn.getProtocol()); 660 if (device != null && device != conn.getDevice()) 661 throw new IllegalStateException(conn + " not contained by " + 662 device.getDeviceName()); 663 connections.add(conn); 664 } 665 666 @Override 667 public void verifyNotUsed(Connection conn) { 668 if (connections.contains(conn)) 669 throw new IllegalStateException(conn + " used by Audit Logger"); 670 } 671 672 public boolean removeConnection(Connection conn) { 673 return connections.remove(conn); 674 } 675 676 677 public void setConnections(List<Connection> connections) { 678 this.connections.clear(); 679 for (Connection connection : connections) addConnection(connection); 680 } 681 682 public List<Connection> getConnections() { 683 return connections; 684 } 685 686 public List<AuditSuppressCriteria> getAuditSuppressCriteriaList() { 687 return suppressAuditMessageFilters; 688 } 689 690 public AuditSuppressCriteria findAuditSuppressCriteriaByCommonName(String cn) { 691 for (AuditSuppressCriteria criteria : suppressAuditMessageFilters) { 692 if (criteria.getCommonName().equals(cn)) 693 return criteria; 694 } 695 return null; 696 } 697 698 public void setAuditSuppressCriteriaList(List<AuditSuppressCriteria> filters) { 699 this.suppressAuditMessageFilters.clear(); 700 this.suppressAuditMessageFilters.addAll(filters); 701 } 702 703 public void addAuditSuppressCriteria(AuditSuppressCriteria criteria) { 704 this.suppressAuditMessageFilters.add(criteria); 705 } 706 707 public void clearAllAuditSuppressCriteria() { 708 this.suppressAuditMessageFilters.clear(); 709 } 710 711 /** 712 * Test if the Event Identification and the Active ActiveParticipant of an 713 * Audit Message matches one of the {@code AuditSuppressCriteria} 714 * 715 * @param msg Audit Message to test 716 * @return {@code true} the specified audit message will be suppressed; 717 * otherwise {@code false} 718 */ 719 public boolean isAuditMessageSuppressed(AuditMessage msg) { 720 for (AuditSuppressCriteria criteria : suppressAuditMessageFilters) { 721 if (criteria.match(msg)) 722 return true; 723 } 724 return false; 725 } 726 727 @Override 728 public void reconfigure(DeviceExtension from) { 729 reconfigure((AuditLogger) from); 730 } 731 732 private void reconfigure(AuditLogger from) { 733 setFacility(from.facility); 734 setSuccessSeverity(from.successSeverity); 735 setMinorFailureSeverity(from.minorFailureSeverity); 736 setSeriousFailureSeverity(from.seriousFailureSeverity); 737 setMajorFailureSeverity(from.majorFailureSeverity); 738 setApplicationName(from.applicationName); 739 setAuditSourceID(from.auditSourceID); 740 setAuditEnterpriseSiteID(from.auditEnterpriseSiteID); 741 setAuditSourceTypeCodes(from.auditSourceTypeCodes); 742 setMessageID(from.messageID); 743 setEncoding(from.encoding); 744 setSchemaURI(from.schemaURI); 745 setTimestampInUTC(from.timestampInUTC); 746 setIncludeBOM(from.includeBOM); 747 setFormatXML(from.formatXML); 748 setSupplement95(from.isSupplement95()); 749 setSpoolDirectory(from.spoolDirectory); 750 setSpoolFileNamePrefix(from.spoolFileNamePrefix); 751 setSpoolFileNameSuffix(from.spoolFileNameSuffix); 752 setRetryInterval(from.retryInterval); 753 setAuditLoggerInstalled(from.auditLoggerInstalled); 754 setAuditRecordRepositoryDevices(from.auditRecordRepositoryDevices); 755 setAuditSuppressCriteriaList(from.suppressAuditMessageFilters); 756 device.reconfigureConnections(connections, from.connections); 757 758 closeActiveConnection(); 759 } 760 761 public Calendar timeStamp() { 762 return timestampInUTC 763 ? new GregorianCalendar(TimeZone.getTimeZone("UTC"), Locale.ENGLISH) 764 : new GregorianCalendar(Locale.ENGLISH); 765 } 766 767 /** 768 * Send Audit Message by Syslog Protocol to Audit Record Repository, if the 769 * message does not match any configured {@code AuditSuppressCriteria}. If 770 * an I/O error occurs sending the message to the {@code AuditRecordRepository} 771 * and if a {@code RetryInterval) is configured, the message will be spooled 772 * into the configured {@code SpoolDirectory} for later re-send and the 773 * method returns {@code false}. If no {@code RetryInterval} is configured, 774 * the method throws an {@code IOException) if an I/O error occurs sending 775 * the message. 776 * <p/> 777 * Attention: sending via UDP without getting an I/O error does not ensure 778 * that the Audit Record Repository actually received the message! 779 * 780 * @param timeStamp included in Syslog Header 781 * @param msg Audit Message 782 * @return {@code SendStatus.SUPPRESSED} if the message was suppressed; 783 * {@code SendStatus.SENT} if the message was successfully emitted; 784 * {@code SendStatus.QUEUED} if the message was spooled for later re-send 785 * @throws IllegalStateException if there is no {@code AuditRecordRepository} associated with 786 * this {@code AuditLogger} 787 * @throws IncompatibleConnectionException if no {@code Connection) of this {@code AuditLogger} is compatible 788 * with any {@code Connection) of the associated {@code AuditRecordRepository} 789 * @throws GeneralSecurityException if the {@link SSLContext} could not get intialized from configured 790 * private key and public certificates 791 * @throws IOException if an I/O error occurs sending the message to the {@code AuditRecordRepository} 792 * or on spooling the message to the file system 793 */ 794 public SendStatus write(Calendar timeStamp, AuditMessage msg) 795 throws IncompatibleConnectionException, GeneralSecurityException, IOException { 796 if (isAuditMessageSuppressed(msg)) 797 return SendStatus.SUPPRESSED; 798 799 return sendMessage(builder().createMessage(timeStamp, msg)); 800 } 801 802 public void writeAsync(Calendar timeStamp, AuditMessage msg) 803 throws IncompatibleConnectionException, GeneralSecurityException, IOException, InsufficientCapacityException { 804 805 if (isAuditMessageSuppressed(msg)) 806 return; 807 808 RingBuffer<AuditMessageEvent> ringBuffer = getDisruptor(this).getRingBuffer(); 809 810 long sequence = ringBuffer.next(); // Grab the next sequence 811 try 812 { 813 AuditMessageEvent msgenrtry = ringBuffer.get(sequence); // Get the entry in the Disruptor 814 // for the sequence 815 msgenrtry.setLogger(this); // Fill with data 816 msgenrtry.setMessage(msg); 817 } 818 finally 819 { 820 ringBuffer.publish(sequence); 821 } 822 } 823 824 public SendStatus write(Calendar timeStamp, Severity severity, 825 byte[] data, int off, int len) 826 throws IncompatibleConnectionException, GeneralSecurityException, IOException { 827 return sendMessage( 828 builder().createMessage(timeStamp, severity, data, off, len)); 829 } 830 831 private MessageBuilder builder() { 832 if (builder == null) 833 builder = new MessageBuilder(); 834 835 return builder; 836 } 837 838 private SendStatus sendMessage(DatagramPacket msg) throws IncompatibleConnectionException, 839 GeneralSecurityException, IOException { 840 String deviceName; 841 SendStatus status = SendStatus.SENT; 842 for (Device arrDev : auditRecordRepositoryDevices) { 843 deviceName = arrDev.getDeviceName(); 844 if (getNumberOfQueuedMessages(deviceName) > 0) { 845 spoolMessage(deviceName, msg); 846 } else { 847 try { 848 activeConnection(arrDev).sendMessage(msg); 849 lastSentTimeInMillis = System.currentTimeMillis(); 850 } catch (IOException e) { 851 lastException = e; 852 if (retryInterval > 0) { 853 LOG.info("Failed to send audit message:", e); 854 spoolMessage(deviceName, msg); 855 scheduleRetry(); 856 status = SendStatus.QUEUED; 857 } else { 858 throw e; 859 } 860 } 861 } 862 } 863 return status; 864 } 865 866 private synchronized void scheduleRetry() { 867 if (retryTimer != null || retryInterval <= 0) { 868 return; 869 } 870 871 LOG.debug("Scheduled retry in {} s", retryInterval); 872 retryTimer = getDevice().schedule( 873 new Runnable() { 874 @Override 875 public void run() { 876 synchronized (AuditLogger.this) { 877 retryTimer = null; 878 } 879 sendQueuedMessages(); 880 } 881 }, 882 retryInterval, TimeUnit.SECONDS); 883 } 884 885 private void spoolMessage(String deviceName, DatagramPacket msg) throws IOException { 886 if (spoolDirectory != null) 887 spoolDirectory.mkdirs(); 888 889 File f = null; 890 try { 891 f = File.createTempFile(spoolFileNamePrefix+DEVICE_NAME_IN_FILENAME_SEPARATOR+deviceName+DEVICE_NAME_IN_FILENAME_SEPARATOR, spoolFileNameSuffix, spoolDirectory); 892 if (spoolDirectory == null) 893 spoolDirectory = f.getParentFile(); 894 895 LOG.info("Spool audit message to {}", f); 896 FileOutputStream out = new FileOutputStream(f); 897 try { 898 out.write(msg.getData(), msg.getOffset(), msg.getLength()); 899 } finally { 900 SafeClose.close(out); 901 } 902 f = null; 903 } catch (IOException e) { 904 throw new IOException("Failed to spool audit message for device "+deviceName, e); 905 } finally { 906 if (f != null) 907 f.delete(); 908 } 909 } 910 911 public void sendQueuedMessages() { 912 File dir = spoolDirectory; 913 if (dir == null) 914 return; 915 boolean failed = false; 916 for (final Device arrDev : this.auditRecordRepositoryDevices) { 917 try { 918 FilenameFilter fnFilter = new FilenameFilter() { 919 String prefix = spoolFileNamePrefix+DEVICE_NAME_IN_FILENAME_SEPARATOR+arrDev.getDeviceName()+DEVICE_NAME_IN_FILENAME_SEPARATOR; 920 @Override 921 public boolean accept(File dir, String name) { 922 return name.startsWith(prefix) 923 && name.endsWith(spoolFileNameSuffix); 924 } 925 }; 926 File[] queuedMessages = dir.listFiles(fnFilter); 927 byte[] b = null; 928 while (queuedMessages != null && queuedMessages.length > 0) { 929 Arrays.sort(queuedMessages, FILE_COMPARATOR); 930 for (File file : queuedMessages) { 931 LOG.debug("Read audit message from {}", file); 932 int len = (int) file.length(); 933 if (b == null || b.length < len) 934 b = new byte[len]; 935 try { 936 FileInputStream in = new FileInputStream(file); 937 try { 938 StreamUtils.readFully(in, b, 0, len); 939 } finally { 940 SafeClose.close(in); 941 } 942 } catch (IOException e) { 943 LOG.warn("Failed to read audit message from {}", file, e); 944 File dest = new File(file.getParent(), file.getName() + ".err"); 945 file.renameTo(dest); 946 continue; 947 } 948 activeConnection(arrDev).sendMessage(new DatagramPacket(b, 0, len)); 949 lastSentTimeInMillis = System.currentTimeMillis(); 950 if (file.delete()) 951 LOG.debug("Delete spool file {}", file); 952 else 953 LOG.warn("Failed to delete spool file {}", file); 954 } 955 queuedMessages = dir.listFiles(FILENAME_FILTER); 956 } 957 } catch (Exception e) { 958 lastException = e; 959 LOG.info("Failed to send audit message:", e); 960 failed = true; 961 } 962 } 963 if (failed) 964 scheduleRetry(); 965 synchronized (this) { 966 notify(); 967 } 968 } 969 970 public Exception getLastException() { 971 return lastException; 972 } 973 974 public long getLastSentTimeInMillis() { 975 return lastSentTimeInMillis; 976 } 977 978 public int getNumberOfQueuedMessages() { 979 int tot = 0; 980 for (Device d : this.auditRecordRepositoryDevices) 981 tot += getNumberOfQueuedMessages(d.getDeviceName()); 982 return tot; 983 } 984 985 public int getNumberOfQueuedMessages(final String deviceName) { 986 try { 987 FilenameFilter fnFilter = new FilenameFilter() { 988 String prefix = spoolFileNamePrefix+DEVICE_NAME_IN_FILENAME_SEPARATOR+deviceName+DEVICE_NAME_IN_FILENAME_SEPARATOR; 989 @Override 990 public boolean accept(File dir, String name) { 991 return name.startsWith(prefix) 992 && name.endsWith(spoolFileNameSuffix); 993 } 994 }; 995 996 return spoolDirectory.list(fnFilter).length; 997 } catch (NullPointerException e) { 998 return 0; 999 } 1000 } 1001 1002 public File[] getQueuedMessages() { 1003 try { 1004 return spoolDirectory.listFiles(FILENAME_FILTER); 1005 } catch (NullPointerException e) { 1006 return null; 1007 } 1008 } 1009 1010 public synchronized void waitForNoQueuedMessages(long timeout) 1011 throws InterruptedException { 1012 int count; 1013 for (Device arrDev : this.auditRecordRepositoryDevices) { 1014 while ( (count = getNumberOfQueuedMessages(arrDev.getDeviceName())) > 0) { 1015 LOG.debug("Wait for {} queued Audit Messages for AuditRepository {}!", count, arrDev.getDeviceName()); 1016 wait(timeout); 1017 } 1018 } 1019 } 1020 1021 public synchronized void closeActiveConnection() { 1022 for (Map.Entry<String, ActiveConnection> entry : activeConnection.entrySet()) { 1023 try { 1024 entry.getValue().close(); 1025 } catch (IOException e) { 1026 LOG.error("Failed to close active connection to {}", entry.getKey(), e); 1027 throw new AssertionError(e); 1028 } 1029 this.activeConnection.clear(); 1030 } 1031 } 1032 1033 private synchronized ActiveConnection activeConnection(Device arrDev) 1034 throws IncompatibleConnectionException { 1035 ActiveConnection activeConnection = this.activeConnection.get(arrDev.getDeviceName()); 1036 if (activeConnection != null) 1037 return activeConnection; 1038 1039 AuditRecordRepository arr = arrDev.getDeviceExtension(AuditRecordRepository.class); 1040 if (arr == null) 1041 throw new IllegalStateException("AuditRecordRepositoryDevice " 1042 + arrDev.getDeviceName() 1043 + " does not provide Audit Record Repository"); 1044 1045 for (Connection remoteConn : arr.getConnections()) 1046 if (remoteConn.isInstalled() && remoteConn.isServer()) 1047 for (Connection conn : connections) 1048 if (conn.isInstalled() && conn.isCompatible(remoteConn)) { 1049 activeConnection = conn.getProtocol().isTCP() 1050 ? new TCPConnection(conn, remoteConn) 1051 : new UDPConnection(conn, remoteConn); 1052 this.activeConnection.put(arrDev.getDeviceName(), activeConnection); 1053 return activeConnection; 1054 } 1055 throw new IncompatibleConnectionException( 1056 "No compatible connection to " + arr + " available on " + this); 1057 } 1058 1059 public static String processID() { 1060 String s = ManagementFactory.getRuntimeMXBean().getName(); 1061 int atPos = s.indexOf('@'); 1062 return atPos > 0 ? s.substring(0, atPos) 1063 : Integer.toString(new Random().nextInt() & 0x7fffffff); 1064 } 1065 1066 public static InetAddress localHost() { 1067 try { 1068 return InetAddress.getLocalHost(); 1069 } catch (UnknownHostException e) { 1070 return null; 1071 } 1072 } 1073 1074 private Severity severityOf(AuditMessage msg) { 1075 String eventOutcomeIndicator = msg.getEventIdentification() 1076 .getEventOutcomeIndicator(); 1077 if (eventOutcomeIndicator.length() == 1) 1078 switch (eventOutcomeIndicator.charAt(0)) { 1079 case '0': 1080 return successSeverity; 1081 case '4': 1082 return minorFailureSeverity; 1083 case '8': 1084 return seriousFailureSeverity; 1085 } 1086 else if (eventOutcomeIndicator.equals("12")) 1087 return majorFailureSeverity; 1088 1089 throw new IllegalArgumentException( 1090 "Illegal eventOutcomeIndicator: " + eventOutcomeIndicator); 1091 } 1092 1093 private int prival(Severity severity) { 1094 return (facility.ordinal() << 3) | severity.ordinal(); 1095 } 1096 1097 public static AuditLogger getDefaultLogger() { 1098 return defaultLogger; 1099 } 1100 1101 public static void setDefaultLogger(AuditLogger defaultLogger) { 1102 AuditLogger.defaultLogger = defaultLogger; 1103 } 1104 1105 private class MessageBuilder extends ByteArrayOutputStream { 1106 1107 DatagramPacket createMessage(Calendar timeStamp, AuditMessage msg) { 1108 try { 1109 reset(); 1110 writeHeader(severityOf(msg), timeStamp); 1111 if (!supplement95) { 1112 AuditMessages.toXML(msg, builder, formatXML, encoding, schemaURI); 1113 } else { 1114 AuditMessages.toSupplement95XML(msg, builder, formatXML, encoding, schemaURI); 1115 } 1116 } catch (IOException e) { 1117 throw new RuntimeException(e); 1118 } 1119 return new DatagramPacket(buf, 0, count); 1120 } 1121 1122 DatagramPacket createMessage(Calendar timeStamp, Severity severity, 1123 byte[] data, int off, int len) { 1124 try { 1125 reset(); 1126 writeHeader(severity, timeStamp); 1127 write(data, off, len); 1128 } catch (IOException e) { 1129 throw new RuntimeException(e); 1130 } 1131 return new DatagramPacket(buf, 0, count); 1132 } 1133 1134 void writeHeader(Severity severity, Calendar timeStamp) 1135 throws IOException { 1136 write('<'); 1137 writeInt(prival(severity)); 1138 write('>'); 1139 write(SYSLOG_VERSION); 1140 write(' '); 1141 write(timeStamp); 1142 write(' '); 1143 if (localHost != null) 1144 write(localHost.getCanonicalHostName().getBytes(encoding)); 1145 else 1146 write('-'); 1147 write(' '); 1148 write(applicationName().getBytes(encoding)); 1149 write(' '); 1150 write(processID.getBytes(encoding)); 1151 write(' '); 1152 if (messageID != null) 1153 write(messageID.getBytes(encoding)); 1154 else 1155 write('-'); 1156 write(' '); 1157 write('-'); 1158 write(' '); 1159 if (includeBOM && encoding.equals("UTF-8")) 1160 write(BOM); 1161 } 1162 1163 void writeInt(int i) { 1164 if (i >= 100) 1165 writeNNN(i); 1166 else if (i >= 10) 1167 writeNN(i); 1168 else 1169 writeN(i); 1170 } 1171 1172 void write(Calendar timeStamp) { 1173 writeNNNN(timeStamp.get(Calendar.YEAR)); 1174 write('-'); 1175 writeNN(timeStamp.get(Calendar.MONTH) + 1); 1176 write('-'); 1177 writeNN(timeStamp.get(Calendar.DAY_OF_MONTH)); 1178 write('T'); 1179 writeNN(timeStamp.get(Calendar.HOUR_OF_DAY)); 1180 write(':'); 1181 writeNN(timeStamp.get(Calendar.MINUTE)); 1182 write(':'); 1183 writeNN(timeStamp.get(Calendar.SECOND)); 1184 write('.'); 1185 writeNNN(timeStamp.get(Calendar.MILLISECOND)); 1186 int tzOffset = timeStamp.get(Calendar.ZONE_OFFSET) 1187 + timeStamp.get(Calendar.DST_OFFSET); 1188 if (tzOffset == 0) 1189 write('Z'); 1190 else { 1191 tzOffset /= 60000; 1192 if (tzOffset > 0) 1193 write('+'); 1194 else { 1195 write('-'); 1196 tzOffset = -tzOffset; 1197 } 1198 writeNN(tzOffset / 60); 1199 write(':'); 1200 writeNN(tzOffset % 60); 1201 } 1202 } 1203 1204 void writeNNNN(int i) { 1205 writeNN(i / 100); 1206 writeNN(i % 100); 1207 } 1208 1209 void writeNNN(int i) { 1210 writeN(i / 100); 1211 writeNN(i % 100); 1212 } 1213 1214 void writeNN(int i) { 1215 write(DIGITS_X0[i]); 1216 write(DIGITS_0X[i]); 1217 } 1218 1219 void writeN(int i) { 1220 write(DIGITS_0X[i]); 1221 } 1222 1223 } 1224 1225 private static String toString(DatagramPacket packet) { 1226 try { 1227 int len = packet.getLength(); 1228 boolean truncate = len > MSG_PROMPT_LEN; 1229 String s = new String(packet.getData(), 0, 1230 truncate ? MSG_PROMPT_LEN : len, "UTF-8"); 1231 if (truncate) 1232 s += "..."; 1233 return s; 1234 } catch (UnsupportedEncodingException e) { 1235 throw new RuntimeException(e); 1236 } 1237 } 1238 1239 private abstract class ActiveConnection implements Closeable { 1240 final Connection conn; 1241 final Connection remoteConn; 1242 1243 ActiveConnection(Connection conn, Connection remoteConn) { 1244 this.conn = conn; 1245 this.remoteConn = remoteConn; 1246 } 1247 1248 abstract void sendMessage(DatagramPacket msg) throws IOException, 1249 IncompatibleConnectionException, GeneralSecurityException; 1250 1251 } 1252 1253 private class UDPConnection extends ActiveConnection { 1254 DatagramSocket ds; 1255 1256 UDPConnection(Connection conn, Connection remoteConn) { 1257 super(conn, remoteConn); 1258 } 1259 1260 @Override 1261 void sendMessage(DatagramPacket msg) throws IOException { 1262 1263 if (ds == null) 1264 ds = conn.createDatagramSocket(); 1265 1266 InetSocketAddress endPoint = remoteConn.getEndPoint(); 1267 LOG.info("Send audit message to {}", endPoint); 1268 if (LOG.isDebugEnabled()) 1269 LOG.debug(AuditLogger.toString(msg)); 1270 msg.setSocketAddress(endPoint); 1271 ds.send(msg); 1272 } 1273 1274 @Override 1275 public void close() { 1276 if (ds != null) { 1277 ds.close(); 1278 ds = null; 1279 } 1280 } 1281 1282 } 1283 1284 private class TCPConnection extends ActiveConnection { 1285 Socket sock; 1286 OutputStream out; 1287 ScheduledFuture<?> idleTimer; 1288 1289 TCPConnection(Connection conn, Connection remoteConn) { 1290 super(conn, remoteConn); 1291 } 1292 1293 void connect() throws IOException, 1294 IncompatibleConnectionException, GeneralSecurityException { 1295 if (sock == null) { 1296 sock = conn.connect(remoteConn); 1297 out = sock.getOutputStream(); 1298 } 1299 } 1300 1301 @Override 1302 synchronized void sendMessage(DatagramPacket packet) throws IOException, 1303 IncompatibleConnectionException, GeneralSecurityException { 1304 stopIdleTimer(); 1305 connect(); 1306 try { 1307 trySendMessage(packet); 1308 } catch (IOException e) { 1309 LOG.info("Failed to send audit message to {} - reconnect", 1310 sock, e); 1311 close(); 1312 connect(); 1313 trySendMessage(packet); 1314 } 1315 startIdleTimer(); 1316 } 1317 1318 void trySendMessage(DatagramPacket packet) throws IOException { 1319 LOG.info("Send audit message to {}", sock); 1320 if (LOG.isDebugEnabled()) 1321 LOG.debug(AuditLogger.toString(packet)); 1322 out.write(Integer.toString(packet.getLength()).getBytes(encoding)); 1323 out.write(' '); 1324 out.write(packet.getData(), packet.getOffset(), packet.getLength()); 1325 out.flush(); 1326 } 1327 1328 private void startIdleTimer() { 1329 int idleTimeout = conn.getIdleTimeout(); 1330 if (idleTimeout > 0) { 1331 LOG.debug("Start Idle timeout of {} ms for {}", idleTimeout, sock); 1332 try { 1333 idleTimer = conn.getDevice().schedule( 1334 new Runnable() { 1335 @Override 1336 public void run() { 1337 onIdleTimerExpired(); 1338 } 1339 }, 1340 idleTimeout, 1341 TimeUnit.MILLISECONDS); 1342 } catch (Exception e) { 1343 LOG.warn("Failed to start Idle timeout", e); 1344 } 1345 } 1346 } 1347 1348 private void stopIdleTimer() { 1349 if (idleTimer != null) { 1350 LOG.debug("Stop Idle timer for {}", sock); 1351 idleTimer.cancel(false); 1352 idleTimer = null; 1353 } 1354 } 1355 1356 @Override 1357 public synchronized void close() { 1358 stopIdleTimer(); 1359 closeSocket(); 1360 } 1361 1362 private void closeSocket() { 1363 if (sock != null) 1364 conn.close(sock); 1365 sock = null; 1366 out = null; 1367 } 1368 1369 private void onIdleTimerExpired() { 1370 ScheduledFuture<?> expiredIdleTimer = idleTimer; 1371 synchronized (this) { 1372 if (expiredIdleTimer != idleTimer) { 1373 LOG.debug("Detect restart of Idle timer for {}", sock); 1374 } else { 1375 LOG.info("Idle timeout for {} expired", sock); 1376 idleTimer = null; 1377 closeSocket(); 1378 } 1379 } 1380 } 1381 1382 } 1383 1384 public static Disruptor<AuditMessageEvent> getDisruptor(AuditLogger logger) { 1385 if (disruptor == null) 1386 disruptor = initializeDisruptor(logger); 1387 1388 return disruptor; 1389 } 1390 1391 private static Disruptor<AuditMessageEvent> initializeDisruptor(AuditLogger logger) { 1392 // Executor that will be used to construct new threads for consumers 1393 Executor executor = Executors.newCachedThreadPool(); 1394 1395 // The factory for the event 1396 AuditMessageEventFactory factory = new AuditMessageEventFactory(); 1397 1398 // Specify the size of the ring buffer, must be power of 2. 1399 int bufferSize = 8; 1400 1401 Disruptor<AuditMessageEvent> disruptorInstance = new Disruptor<AuditMessageEvent>(factory, bufferSize, executor); 1402 1403 // Connect the handler 1404 disruptorInstance.handleEventsWith(new AuditMessageEventHandler()); 1405 1406 // Start the Disruptor, starts all threads running 1407 disruptorInstance.start(); 1408 1409 return disruptorInstance; 1410 } 1411}