001/* ***** BEGIN LICENSE BLOCK ***** 002 * Version: MPL 1.1/GPL 2.0/LGPL 2.1 003 * 004 * The contents of this file are subject to the Mozilla Public License Version 005 * 1.1 (the "License"); you may not use this file except in compliance with 006 * the License. You may obtain a copy of the License at 007 * http://www.mozilla.org/MPL/ 008 * 009 * Software distributed under the License is distributed on an "AS IS" basis, 010 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 011 * for the specific language governing rights and limitations under the 012 * License. 013 * 014 * The Original Code is part of dcm4che, an implementation of DICOM(TM) in 015 * Java(TM), hosted at https://github.com/gunterze/dcm4che. 016 * 017 * The Initial Developer of the Original Code is 018 * Agfa Healthcare. 019 * Portions created by the Initial Developer are Copyright (C) 2011 020 * the Initial Developer. All Rights Reserved. 021 * 022 * Contributor(s): 023 * See @authors listed below 024 * 025 * Alternatively, the contents of this file may be used under the terms of 026 * either the GNU General Public License Version 2 or later (the "GPL"), or 027 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 028 * in which case the provisions of the GPL or the LGPL are applicable instead 029 * of those above. If you wish to allow use of your version of this file only 030 * under the terms of either the GPL or the LGPL, and not to allow others to 031 * use your version of this file under the terms of the MPL, indicate your 032 * decision by deleting the provisions above and replace them with the notice 033 * and other provisions required by the GPL or the LGPL. If you do not delete 034 * the provisions above, a recipient may use your version of this file under 035 * the terms of any one of the MPL, the GPL or the LGPL. 036 * 037 * ***** END LICENSE BLOCK ***** */ 038 039package org.dcm4che3.net.service; 040 041import java.io.IOException; 042import java.util.ArrayList; 043import java.util.Collections; 044import java.util.Iterator; 045import java.util.List; 046import java.util.Observable; 047 048import org.dcm4che3.data.Attributes; 049import org.dcm4che3.data.Tag; 050import org.dcm4che3.data.UID; 051import org.dcm4che3.io.DicomInputStream; 052import org.dcm4che3.net.Association; 053import org.dcm4che3.net.DataWriter; 054import org.dcm4che3.net.DimseRSPHandler; 055import org.dcm4che3.net.InputStreamDataWriter; 056import org.dcm4che3.net.Status; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060/** 061 * @param <T> 062 * instance locator type 063 * 064 * @author Umberto Cappellini <umberto.cappellini@agfa.com> 065 */ 066public class BasicCStoreSCU<T extends InstanceLocator> extends Observable 067 implements CStoreSCU<T> { 068 069 protected static final Logger LOG = LoggerFactory 070 .getLogger(BasicCStoreSCU.class); 071 072 protected volatile int status = Status.Pending; 073 protected int priority = 0; 074 protected int nr_instances; 075 protected List<T> completed = Collections.synchronizedList(new ArrayList<T>()); 076 protected List<T> warning = Collections.synchronizedList(new ArrayList<T>()); 077 protected List<T> failed = Collections.synchronizedList(new ArrayList<T>()); 078 protected int outstandingRSP = 0; 079 protected Object outstandingRSPLock = new Object(); 080 081 @Override 082 public int getStatus() { 083 return status; 084 } 085 086 @Override 087 public boolean cancel() { 088 if (status==Status.Pending) { 089 this.status = Status.Cancel; 090 return true; 091 } 092 return false; 093 } 094 095 @Override 096 public int getPriority() { 097 return priority; 098 } 099 100 @Override 101 public List<T> getCompleted() { 102 return completed; 103 } 104 105 @Override 106 public List<T> getWarning() { 107 return warning; 108 } 109 110 @Override 111 public List<T> getFailed() { 112 return failed; 113 } 114 115 @Override 116 public int getRemaining() { 117 return (nr_instances - completed.size() - warning.size() - failed 118 .size()); 119 } 120 121 @Override 122 public BasicCStoreSCUResp cstore(List<T> instances, Association storeas, 123 int priority) { 124 125 if (storeas == null) 126 throw new IllegalStateException("null Store Association"); 127 128 if (instances == null) 129 throw new IllegalStateException("null Store Instances"); 130 131 nr_instances = instances.size(); 132 133 try { 134 for (Iterator<T> iter = instances.iterator(); iter.hasNext();) { 135 T inst = iter.next(); 136 137 if (status == Status.Cancel) 138 break; 139 140 try { 141 storeInstance(storeas, inst); 142 } catch (Throwable e) { 143 LOG.warn( 144 "Unable to perform sub-operation on association to {}", 145 storeas.getRemoteAET(), e); 146 failed.add(inst); 147 while (iter.hasNext()) 148 failed.add(iter.next()); 149 } 150 } 151 waitForOutstandingCStoreRSP(storeas); 152 153 setFinalStatus(); 154 155 return makeRSP(status); 156 } finally { 157 try { 158 close(); 159 } catch (Throwable e) { 160 LOG.warn("Exception thrown by {}.close()", 161 getClass().getName(), e); 162 } 163 } 164 } 165 166 protected void storeInstance(Association storeas, T inst) throws IOException, InterruptedException { 167 String tsuid; 168 DataWriter dataWriter; 169 try { 170 tsuid = selectTransferSyntaxFor(storeas, inst); 171 dataWriter = createDataWriter(inst, tsuid); 172 } catch (Exception e) { 173 LOG.info("Unable to store {}/{} to {}", 174 UID.nameOf(inst.cuid), UID.nameOf(inst.tsuid), 175 storeas.getRemoteAET(), e); 176 failed.add(inst); 177 return; 178 } 179 180 cstore(storeas, inst, tsuid, dataWriter); 181 } 182 183 private void setFinalStatus() { 184 185 if (status!=Status.Cancel) { 186 if (failed.size() > 0) { 187 if (failed.size() == nr_instances) 188 status = Status.UnableToPerformSubOperations; 189 else 190 status = Status.OneOrMoreFailures; 191 } else { 192 status = Status.Success; 193 } 194 } 195 } 196 197 198 private void waitForOutstandingCStoreRSP(Association storeas) { 199 try { 200 synchronized (outstandingRSPLock) { 201 while (outstandingRSP > 0) 202 outstandingRSPLock.wait(); 203 } 204 } catch (InterruptedException e) { 205 LOG.warn("Failed to wait for outstanding RSP on association to {}", 206 storeas.getRemoteAET(), e); 207 } 208 } 209 210 protected void releaseStoreAssociation(Association storeas) { 211 try { 212 storeas.release(); 213 } catch (IOException e) { 214 LOG.warn("Failed to release association to {}", 215 storeas.getRemoteAET(), e); 216 } 217 } 218 219 protected int cstore(Association storeas, T inst, String tsuid, 220 DataWriter dataWriter) throws IOException, InterruptedException { 221 int messageID = storeas.nextMessageID(); 222 DimseRSPHandler rspHandler = new CStoreRSPHandler(messageID, inst); 223 storeas.cstore(inst.cuid, inst.iuid, priority, dataWriter, tsuid, 224 rspHandler); 225 synchronized (outstandingRSPLock) { 226 outstandingRSP++; 227 } 228 return messageID; 229 } 230 231 private final class CStoreRSPHandler extends DimseRSPHandler { 232 233 private final T inst; 234 235 public CStoreRSPHandler(int msgId, T inst) { 236 super(msgId); 237 this.inst = inst; 238 } 239 240 @Override 241 public void onDimseRSP(Association as, Attributes cmd, Attributes data) { 242 super.onDimseRSP(as, cmd, data); 243 int storeStatus = cmd.getInt(Tag.Status, -1); 244 if (storeStatus == Status.Success) 245 completed.add(inst); 246 else if ((storeStatus & 0xB000) == 0xB000) 247 warning.add(inst); 248 else 249 failed.add(inst); 250 251 synchronized (outstandingRSPLock) { 252 if (--outstandingRSP == 0) 253 outstandingRSPLock.notify(); 254 } 255 256 setChanged(); 257 notifyObservers(); // notify observers of received rsp 258 } 259 260 @Override 261 public void onClose(Association as) { 262 super.onClose(as); 263 synchronized (outstandingRSPLock) { 264 outstandingRSP = 0; 265 outstandingRSPLock.notify(); 266 } 267 } 268 } 269 270 protected String selectTransferSyntaxFor(Association storeas, T inst) 271 throws Exception { 272 return inst.tsuid; 273 } 274 275 protected DataWriter createDataWriter(T inst, String tsuid) 276 throws Exception { 277 DicomInputStream in = new DicomInputStream(inst.getFile()); 278 in.readFileMetaInformation(); 279 return new InputStreamDataWriter(in); 280 } 281 282 /** 283 * returns an aggregated store rsp 284 */ 285 private BasicCStoreSCUResp makeRSP(int status) { 286 287 BasicCStoreSCUResp rsp = new BasicCStoreSCUResp(); 288 rsp.setStatus(status); 289 rsp.setCompleted(completed.size()); 290 rsp.setFailed(failed.size()); 291 rsp.setWarning(warning.size()); 292 if (!failed.isEmpty()) { 293 String[] iuids = new String[failed.size()]; 294 for (int i = 0; i < iuids.length; i++) 295 iuids[i] = failed.get(0).iuid; 296 rsp.setFailedUIDs(iuids); 297 } 298 return rsp; 299 } 300 301 protected void close() { 302 } 303 304}