-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #8 from sailthru/LT-984-m-particle-outgoing-lambda…
…-send-message-to-m-particle-endpoint [LT-984] Send messages to mParticle
- Loading branch information
Showing
18 changed files
with
572 additions
and
17 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package com.sailthru.sqs; | ||
|
||
import com.mparticle.ApiClient; | ||
import com.mparticle.client.EventsApi; | ||
import com.mparticle.model.Batch; | ||
import com.mparticle.model.CustomEvent; | ||
import com.mparticle.model.CustomEventData; | ||
import com.mparticle.model.UserIdentities; | ||
import com.sailthru.sqs.exception.RetryLaterException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import retrofit2.Call; | ||
import retrofit2.Response; | ||
|
||
import java.io.IOException; | ||
|
||
public class MParticleClient { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class); | ||
private static final String BASE_URL = "https://inbound.mparticle.com/s2s/v2/"; | ||
|
||
public void submit(final MParticleMessage message) throws RetryLaterException { | ||
|
||
final Batch batch = prepareBatch(message); | ||
|
||
final EventsApi eventsApi = getEventsApi(message.getAuthenticationKey(), message.getAuthenticationSecret()); | ||
|
||
LOGGER.debug("Attempting to send batch: {} for message: {}", batch, message); | ||
|
||
final Call<Void> singleResult = eventsApi.uploadEvents(batch); | ||
|
||
try { | ||
final Response<Void> response = singleResult.execute(); | ||
LOGGER.info("Received response code: {}", response.code()); | ||
|
||
if (!response.isSuccessful()) { | ||
throw new RetryLaterException(); | ||
} | ||
|
||
LOGGER.info("Successfully sent message: {}", message); | ||
} catch (IOException e) { | ||
//Retry for all IOExceptions | ||
throw new RetryLaterException(e); | ||
} | ||
} | ||
|
||
private EventsApi getEventsApi(final String apiKey, final String apiSecret) { | ||
final ApiClient apiClient = new ApiClient(apiKey, apiSecret); | ||
|
||
apiClient.getAdapterBuilder().baseUrl(BASE_URL); | ||
|
||
return apiClient.createService(EventsApi.class); | ||
} | ||
|
||
private Batch prepareBatch(final MParticleMessage message) { | ||
final Batch batch = new Batch(); | ||
batch.environment(Batch.Environment.DEVELOPMENT); | ||
batch.userIdentities(new UserIdentities() | ||
.email(message.getProfileEmail()) | ||
); | ||
|
||
final CustomEvent event = new CustomEvent().data( | ||
new CustomEventData() | ||
.eventName(message.getEventName()) | ||
.customEventType(CustomEventData.CustomEventType.valueOf(message.getEventType())) | ||
); | ||
|
||
event.getData().customAttributes(message.getAdditionalData()); | ||
|
||
batch.addEventsItem(event); | ||
|
||
return batch; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package com.sailthru.sqs; | ||
|
||
import java.util.Map; | ||
|
||
public class MParticleMessage { | ||
private String authenticationKey; | ||
private String authenticationSecret; | ||
private String eventName; | ||
private String eventType; | ||
private Map<String, String> additionalData; | ||
private String profileEmail; | ||
|
||
public String getAuthenticationKey() { | ||
return authenticationKey; | ||
} | ||
|
||
public String getAuthenticationSecret() { | ||
return authenticationSecret; | ||
} | ||
|
||
public String getEventName() { | ||
return eventName; | ||
} | ||
|
||
public String getEventType() { | ||
return eventType; | ||
} | ||
|
||
public Map<String, String> getAdditionalData() { | ||
return additionalData; | ||
} | ||
|
||
public String getProfileEmail() { | ||
return profileEmail; | ||
} | ||
|
||
public void setAuthenticationKey(String authenticationKey) { | ||
this.authenticationKey = authenticationKey; | ||
} | ||
|
||
public void setAuthenticationSecret(String authenticationSecret) { | ||
this.authenticationSecret = authenticationSecret; | ||
} | ||
|
||
public void setEventName(String eventName) { | ||
this.eventName = eventName; | ||
} | ||
|
||
public void setEventType(String eventType) { | ||
this.eventType = eventType; | ||
} | ||
|
||
public void setAdditionalData(Map<String, String> additionalData) { | ||
this.additionalData = additionalData; | ||
} | ||
|
||
public void setProfileEmail(String profileEmail) { | ||
this.profileEmail = profileEmail; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "MParticleMessage{" + | ||
"authenticationKey='" + authenticationKey + '\'' + | ||
", authenticationSecret='" + authenticationSecret + '\'' + | ||
", eventName='" + eventName + '\'' + | ||
", eventType='" + eventType + '\'' + | ||
", additionalData=" + additionalData + | ||
", profileEmail='" + profileEmail + '\'' + | ||
'}'; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package com.sailthru.sqs; | ||
|
||
import com.amazonaws.services.lambda.runtime.events.SQSEvent; | ||
import com.sailthru.sqs.exception.AuthenticationKeyNotProvidedException; | ||
import com.sailthru.sqs.exception.AuthenticationSecretNotProvidedException; | ||
import com.sailthru.sqs.exception.NoRetryException; | ||
import com.sailthru.sqs.exception.RetryLaterException; | ||
import com.sailthru.sqs.exception.UnparseablePayloadException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.utils.StringUtils; | ||
|
||
import java.io.IOException; | ||
|
||
public class MessageProcessor { | ||
private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class); | ||
|
||
private MessageSerializer messageSerializer; | ||
private MParticleClient mParticleClient; | ||
|
||
public MessageProcessor() { | ||
messageSerializer = new MessageSerializer(); | ||
mParticleClient = new MParticleClient(); | ||
} | ||
|
||
public void process(final SQSEvent.SQSMessage sqsMessage) throws RetryLaterException, NoRetryException { | ||
final String rawMessage = sqsMessage.getBody(); | ||
LOGGER.debug("Received message: {}", rawMessage); | ||
|
||
final MParticleMessage message = parseAndValidateMessage(rawMessage); | ||
|
||
getMParticleClient().submit(message); | ||
} | ||
|
||
private MParticleMessage parseAndValidateMessage(final String rawMessage) throws NoRetryException { | ||
try { | ||
final MParticleMessage message = getSerializer().deserialize(rawMessage, MParticleMessage.class); | ||
|
||
if (StringUtils.isEmpty(message.getAuthenticationKey())) { | ||
throw new AuthenticationKeyNotProvidedException("Authentication key not provided."); | ||
} | ||
|
||
if (StringUtils.isEmpty(message.getAuthenticationSecret())) { | ||
throw new AuthenticationSecretNotProvidedException("Authentication secret not provided."); | ||
} | ||
|
||
return message; | ||
} catch (IOException e) { | ||
throw new UnparseablePayloadException(String.format("Could not deserialize message: %s", rawMessage), e); | ||
} | ||
} | ||
|
||
private MParticleClient getMParticleClient() { | ||
return mParticleClient; | ||
} | ||
|
||
private MessageSerializer getSerializer() { | ||
return messageSerializer; | ||
} | ||
|
||
public void setMParticleClient(final MParticleClient mParticleClient) { | ||
this.mParticleClient = mParticleClient; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package com.sailthru.sqs; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import java.io.IOException; | ||
|
||
public final class MessageSerializer { | ||
private final ObjectMapper mapper = new ObjectMapper(); | ||
|
||
public <T> T deserialize(String content, Class<T> valueType) throws IOException { | ||
return this.mapper.readValue(content, valueType); | ||
} | ||
|
||
public String serialize(Object newMessage) throws JsonProcessingException { | ||
return this.mapper.writeValueAsString(newMessage); | ||
} | ||
} |
Oops, something went wrong.