001/* 002 * Copyright 2019 Perihelios LLC 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package com.perihelios.aws.lambda.cloudwatch.dispatcher; 017 018import com.amazonaws.services.lambda.runtime.Context; 019import com.google.gson.Gson; 020import com.google.gson.GsonBuilder; 021import com.google.gson.JsonElement; 022import com.google.gson.JsonObject; 023import com.google.gson.JsonParser; 024import com.perihelios.aws.lambda.cloudwatch.dispatcher.event.CloudWatchEvent; 025import com.perihelios.aws.lambda.cloudwatch.dispatcher.event.DetailType; 026import com.perihelios.aws.lambda.cloudwatch.dispatcher.event.Header; 027 028import java.io.ByteArrayOutputStream; 029import java.io.IOException; 030import java.io.InputStream; 031import java.io.UncheckedIOException; 032import java.time.ZonedDateTime; 033import java.util.HashMap; 034import java.util.Map; 035import java.util.function.BiConsumer; 036 037import static java.nio.charset.StandardCharsets.UTF_8; 038 039/** 040 * Main entry point of the API—all users of this library will create and configure an instance of this class. 041 * <p> 042 * A typical usage of this class in a Lambda function is: 043 * </p> 044 * <pre> 045 * ... 046 * void myLambdaFunc(InputStream message, Context context) { 047 * new CloudWatchEventDispatcher(message, context) 048 * .withEventHandler(MyEvent1.class, new MyEvent1Handler()) 049 * .withEventHandler(MyEvent2.class, new MyEvent2Handler()) 050 * .logMessage() 051 * .dispatch(); 052 * } 053 * ... 054 * </pre> 055 */ 056public class CloudWatchEventDispatcher { 057 private final String message; 058 private final Context context; 059 private final Map<String, BiConsumer<?, Context>> handlers; 060 private final Map<String, Class<? extends CloudWatchEvent>> eventTypes; 061 062 private boolean logRawMessage; 063 064 /** 065 * Creates a dispatcher for a CloudWatch event, ready for further configuration. 066 * 067 * @param message raw message stream, presumed to contain CloudWatch event JSON 068 * @param context AWS Lambda context, to be passed to handlers as they are invoked 069 */ 070 public CloudWatchEventDispatcher(InputStream message, Context context) { 071 this.message = readQuickly(message).trim(); 072 this.context = context; 073 this.handlers = new HashMap<>(); 074 this.eventTypes = new HashMap<>(); 075 } 076 077 /** 078 * Registers an event handler for a particular CloudWatch event type. 079 * <p> 080 * All classes passed in {@code eventType} must be annotated with {@link DetailType}. See the general description 081 * of that annotation, and of {@link CloudWatchEvent}, for details. 082 * 083 * @param eventType class to which events will be unmarshalled 084 * @param handler consumer of events of {@code eventType} type 085 * @param <T> type of event, with type bounds ensuring compatibility between {@code eventType} and 086 * {@code handler} 087 * @return a reference to this object 088 */ 089 public <T extends CloudWatchEvent> CloudWatchEventDispatcher withEventHandler( 090 Class<T> eventType, BiConsumer<? super T, Context> handler) { 091 092 DetailType detailType = eventType.getAnnotation(DetailType.class); 093 094 if (detailType == null) { 095 throw new IllegalArgumentException( 096 "Cannot register event handler for event type " + eventType.getName() + 097 "; event type not annotated with " + DetailType.class.getName() 098 ); 099 } 100 101 String typeDescription = detailType.value(); 102 103 eventTypes.put(typeDescription, eventType); 104 handlers.put(typeDescription, handler); 105 106 return this; 107 } 108 109 /** 110 * Instructs the dispatcher to log the incoming message, before it is parsed as JSON. 111 * <p> 112 * Activating logging via this method is primarily useful for troubleshooting errors where an AWS Lambda may be 113 * receiving messages <em>other than</em> CloudWatch events. CloudWatch will only send valid JSON documents, 114 * but it is possible to (erroneously) connect a Lambda function expecting CloudWatch events to another event 115 * source. 116 * <p> 117 * Logging is done via the {@link com.amazonaws.services.lambda.runtime.LambdaLogger LambdaLogger} obtained from 118 * {@link Context#getLogger()}. CloudWatch Logs must be enabled for the Lambda, and the Lambda's role must have 119 * the necessary permissions to write to CloudWatch Logs. 120 * 121 * @return a reference to this object 122 */ 123 public CloudWatchEventDispatcher logMessage() { 124 logRawMessage = true; 125 126 return this; 127 } 128 129 /** 130 * Dispatches the event to registered handlers. 131 * <p> 132 * This is the terminal operation of the dispatcher. All settings specified via the other methods of this class are 133 * applied at this time. 134 * 135 * @throws IllegalArgumentException if the message is not valid JSON, or if the message is missing 136 * {@code detail-type} or {@code detail} properties, or if the message's 137 * {@code detail-type} does not correspond to any event types registered via 138 * {@link #withEventHandler(Class, BiConsumer) withEventHandler()} 139 */ 140 public void dispatch() { 141 if (logRawMessage) { 142 context.getLogger().log("Raw message: " + message); 143 } 144 145 JsonObject jsonObject; 146 try { 147 jsonObject = new JsonParser().parse(message).getAsJsonObject(); 148 } catch (Exception e) { 149 throw new IllegalArgumentException("Failed to parse message as JSON", e); 150 } 151 152 Gson gson = new GsonBuilder() 153 .registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeAdapter()) 154 .create(); 155 156 JsonElement detailType = jsonObject.get("detail-type"); 157 if (detailType == null) { 158 throw new IllegalArgumentException( 159 "Received message is not CloudWatch event (missing \"detail-type\" property)" 160 ); 161 } 162 163 JsonElement detail = jsonObject.get("detail"); 164 if (detail == null) { 165 throw new IllegalArgumentException( 166 "Received message is not CloudWatch event (missing \"detail\" property)" 167 ); 168 } 169 170 String typeDescription = detailType.getAsString(); 171 172 Class<? extends CloudWatchEvent> eventType = eventTypes.get(typeDescription); 173 if (eventType == null) { 174 throw new IllegalArgumentException( 175 "Received event of unknown type; detail-type field in message: " + typeDescription 176 ); 177 } 178 179 Header header = gson.fromJson(jsonObject, Header.class); 180 CloudWatchEvent event = gson.fromJson(detail, eventType); 181 event.setHeader(header); 182 183 // The generic type bounds used on the method that stores key/value pairs in the map make this type-safe 184 @SuppressWarnings("unchecked") 185 BiConsumer<Object, Context> handler = 186 (BiConsumer<Object, Context>) handlers.get(typeDescription); 187 188 handler.accept(event, context); 189 } 190 191 private static String readQuickly(InputStream stream) { 192 try { 193 byte[] bytes = new byte[stream.available()]; 194 195 //noinspection ResultOfMethodCallIgnored 196 stream.read(bytes); 197 int next = stream.read(); 198 199 if (next < 0) { 200 return new String(bytes, UTF_8); 201 } 202 203 return readSlowly(bytes, next, stream); 204 } catch (IOException e) { 205 throw new UncheckedIOException(e); 206 } finally { 207 try { 208 stream.close(); 209 } catch (IOException ignore) { 210 } 211 } 212 } 213 214 private static String readSlowly(byte[] initial, int next, InputStream stream) throws IOException { 215 ByteArrayOutputStream outputStream = new ByteArrayOutputStream(initial.length + 1 + 65_536); 216 byte[] buffer = new byte[65_536]; 217 218 outputStream.write(initial); 219 outputStream.write(next); 220 221 int read; 222 while ((read = stream.read(buffer)) >= 0) { 223 outputStream.write(buffer, 0, read); 224 } 225 226 return new String(outputStream.toByteArray(), UTF_8); 227 } 228}