diff --git a/.buildscript/deploy_snapshot.sh b/.buildscript/deploy_snapshot.sh
index e02d3c73..2d98a91b 100755
--- a/.buildscript/deploy_snapshot.sh
+++ b/.buildscript/deploy_snapshot.sh
@@ -23,6 +23,6 @@ elif [ "$CIRCLE_BRANCH" != "$BRANCH" ]; then
echo "Skipping snapshot deployment: wrong branch. Expected '$BRANCH' but was '$CIRCLE_BRANCH'."
else
echo "Deploying snapshot..."
- mvn clean source:jar javadoc:jar deploy --settings=".buildscript/settings.xml" -Dmaven.test.skip=true
+ mvn clean source:jar javadoc:jar deploy --settings=".buildscript/settings.xml" -Dmaven.test.skip=true -Dgpg.skip=true
echo "Snapshot deployed!"
fi
diff --git a/.buildscript/settings.xml b/.buildscript/settings.xml
index 4f18cc59..81d298ff 100644
--- a/.buildscript/settings.xml
+++ b/.buildscript/settings.xml
@@ -1,9 +1,9 @@
+ You can't fix what you can't measure
+
* BlockingFlush blockingFlush = BlockingFlush.create();
* Analytics analytics = Analytics.builder(writeKey)
- * .plugin(blockingFlush)
+ * .plugin(blockingFlush.plugin())
* .build();
*
* // Do some work.
@@ -55,12 +55,12 @@ public boolean transform(MessageBuilder builder) {
new Callback() {
@Override
public void success(Message message) {
- phaser.arrive();
+ phaser.arriveAndDeregister();
}
@Override
public void failure(Message message, Throwable throwable) {
- phaser.arrive();
+ phaser.arriveAndDeregister();
}
});
}
diff --git a/analytics-sample/src/main/java/sample/Main.java b/analytics-sample/src/main/java/sample/Main.java
index cfad1468..77204369 100644
--- a/analytics-sample/src/main/java/sample/Main.java
+++ b/analytics-sample/src/main/java/sample/Main.java
@@ -1,6 +1,5 @@
package sample;
-import com.jakewharton.retrofit.Ok3Client;
import com.segment.analytics.Analytics;
import com.segment.analytics.messages.TrackMessage;
import java.util.LinkedHashMap;
@@ -9,7 +8,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.OkHttpClient;
-import retrofit.client.Client;
public class Main {
public static void main(String... args) throws Exception {
@@ -49,13 +47,12 @@ public static void main(String... args) throws Exception {
* customize the client to your needs. For instance, this client is configured to automatically
* gzip outgoing requests.
*/
- private static Client createClient() {
- return new Ok3Client(
- new OkHttpClient.Builder()
- .connectTimeout(15, TimeUnit.SECONDS)
- .readTimeout(15, TimeUnit.SECONDS)
- .writeTimeout(15, TimeUnit.SECONDS)
- .addInterceptor(new GzipRequestInterceptor())
- .build());
+ private static OkHttpClient createClient() {
+ return new OkHttpClient.Builder()
+ .connectTimeout(15, TimeUnit.SECONDS)
+ .readTimeout(15, TimeUnit.SECONDS)
+ .writeTimeout(15, TimeUnit.SECONDS)
+ .addInterceptor(new GzipRequestInterceptor())
+ .build();
}
}
diff --git a/analytics-spring-boot-starter/pom.xml b/analytics-spring-boot-starter/pom.xml
index eb8478c0..4ebc8959 100644
--- a/analytics-spring-boot-starter/pom.xml
+++ b/analytics-spring-boot-starter/pom.xml
@@ -6,12 +6,40 @@
com.segment.analytics.java
analytics-parent
- 2.1.2-SNAPSHOT
+ 3.5.5-SNAPSHOT
+ com.segment.analytics.java
analytics-spring-boot-starter
+ 3.5.5-SNAPSHOT
Spring Boot starter for Segment Analytics for Java
+ Spring Boot auto-configuration for Segment Analytics for Java.
+ https://github.com/segmentio/analytics-java
+
+
+
+ The MIT License (MIT)
+ http://opensource.org/licenses/MIT
+
+
+
+
+ https://github.com/segmentio/analytics-java/
+ scm:git:https://github.com/segmentio/analytics-java.git
+ scm:git:git@github.com:segmentio/analytics-java.git
+ analytics-parent-3.5.4
+
+
+
+
+ segment
+ Segment
+ Segment
+ https://segment.com
+
+
+
diff --git a/analytics/pom.xml b/analytics/pom.xml
index 7a67a3aa..05e36fbb 100644
--- a/analytics/pom.xml
+++ b/analytics/pom.xml
@@ -6,12 +6,40 @@
com.segment.analytics.java
analytics-parent
- 2.1.2-SNAPSHOT
+ 3.5.5-SNAPSHOT
+ com.segment.analytics.java
analytics
+ 3.5.5-SNAPSHOT
Analytics for Java
+ The hassle-free way to add analytics to your Java app.
+ https://github.com/segmentio/analytics-java
+
+
+
+ The MIT License (MIT)
+ http://opensource.org/licenses/MIT
+
+
+
+
+ https://github.com/segmentio/analytics-java/
+ scm:git:https://github.com/segmentio/analytics-java.git
+ scm:git:git@github.com:segmentio/analytics-java.git
+ analytics-parent-3.5.4
+
+
+
+
+ segment
+ Segment
+ Segment
+ https://segment.com
+
+
+
com.segment.analytics.java
@@ -21,42 +49,69 @@
com.squareup.okhttp3
okhttp
+ ${okhttp.version}
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+ ${logging.version}
+
+
+ com.squareup.retrofit2
+ converter-gson
+ 2.9.0
+
+
+ com.squareup.retrofit2
+ retrofit-mock
+ ${retrofit.version}
com.google.code.findbugs
findbugs
+ ${findbugs.version}
provided
com.segment.backo
backo
+ ${backo.version}
-
junit
junit
+ ${junit.version}
test
com.squareup.burst
burst-junit4
+ ${burst.version}
test
com.squareup.burst
burst
+ ${burst.version}
test
org.assertj
assertj-core
+ ${assertj.version}
test
org.mockito
mockito-core
+ ${mockito.version}
test
+
+ jakarta.annotation
+ jakarta.annotation-api
+ 2.1.1
+
@@ -64,7 +119,7 @@
org.codehaus.mojo
templating-maven-plugin
- 1.0-alpha-3
+ 1.0.0
@@ -75,5 +130,4 @@
-
diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java
index ffc19c8d..9b88de8c 100644
--- a/analytics/src/main/java/com/segment/analytics/Analytics.java
+++ b/analytics/src/main/java/com/segment/analytics/Analytics.java
@@ -4,23 +4,28 @@
import com.google.gson.GsonBuilder;
import com.segment.analytics.gson.AutoValueAdapterFactory;
import com.segment.analytics.gson.ISO8601DateAdapter;
+import com.segment.analytics.gson.ISO8601InstantAdapter;
import com.segment.analytics.http.SegmentService;
import com.segment.analytics.internal.AnalyticsClient;
import com.segment.analytics.internal.AnalyticsVersion;
import com.segment.analytics.messages.Message;
import com.segment.analytics.messages.MessageBuilder;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import retrofit.Endpoint;
-import retrofit.Endpoints;
-import retrofit.RestAdapter;
-import retrofit.client.Client;
-import retrofit.converter.GsonConverter;
+import okhttp3.ConnectionSpec;
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.TlsVersion;
+import okhttp3.logging.HttpLoggingInterceptor;
+import retrofit2.Retrofit;
+import retrofit2.converter.gson.GsonConverterFactory;
/**
* The entry point into the Segment for Java library.
@@ -65,11 +70,50 @@ public static Builder builder(String writeKey) {
/** Enqueue the given message to be uploaded to Segment's servers. */
public void enqueue(MessageBuilder builder) {
+ Message message = buildMessage(builder);
+ if (message == null) {
+ return;
+ }
+ client.enqueue(message);
+ }
+
+ /**
+ * Inserts the message into queue if it is possible to do so immediately without violating
+ * capacity restrictions, returning {@code true} upon success and {@code false} if no space is
+ * currently available.
+ *
+ * @param builder
+ */
+ public boolean offer(MessageBuilder builder) {
+ Message message = buildMessage(builder);
+ if (message == null) {
+ return false;
+ }
+ return client.offer(message);
+ }
+
+ /** Flush events in the message queue. */
+ public void flush() {
+ client.flush();
+ }
+
+ /** Stops this instance from processing further requests. */
+ public void shutdown() {
+ client.shutdown();
+ }
+
+ /**
+ * Helper method to build message
+ *
+ * @param builder
+ * @return Instance of Message if valid message can be build null if skipping this message
+ */
+ private Message buildMessage(MessageBuilder builder) {
for (MessageTransformer messageTransformer : messageTransformers) {
boolean shouldContinue = messageTransformer.transform(builder);
if (!shouldContinue) {
log.print(Log.Level.VERBOSE, "Skipping message %s.", builder);
- return;
+ return null;
}
}
Message message = builder.build();
@@ -77,40 +121,37 @@ public void enqueue(MessageBuilder builder) {
message = messageInterceptor.intercept(message);
if (message == null) {
log.print(Log.Level.VERBOSE, "Skipping message %s.", builder);
- return;
+ return null;
}
}
- client.enqueue(message);
- }
-
- /** Flush events in the message queue. */
- public void flush() {
- client.flush();
- }
-
- /** Stops this instance from processing further requests. */
- public void shutdown() {
- client.shutdown();
+ return message;
}
/** Fluent API for creating {@link Analytics} instances. */
public static class Builder {
- private static final Endpoint DEFAULT_ENDPOINT =
- Endpoints.newFixedEndpoint("https://api.segment.io");
+ private static final String DEFAULT_ENDPOINT = "https://api.segment.io";
+ private static final String DEFAULT_PATH = "/v1/import/";
private static final String DEFAULT_USER_AGENT = "analytics-java/" + AnalyticsVersion.get();
+ private static final int MESSAGE_QUEUE_MAX_BYTE_SIZE = 1024 * 500;
private final String writeKey;
- private Client client;
+ private OkHttpClient client;
private Log log;
- private Endpoint endpoint;
+ public HttpUrl endpoint;
+ public HttpUrl uploadURL;
private String userAgent = DEFAULT_USER_AGENT;
private List messageTransformers;
private List messageInterceptors;
private ExecutorService networkExecutor;
private ThreadFactory threadFactory;
private int flushQueueSize;
+ private int maximumFlushAttempts;
+ private int maximumQueueSizeInBytes;
private long flushIntervalInMillis;
private List callbacks;
+ private int queueCapacity;
+ private boolean forceTlsV1 = false;
+ private GsonBuilder gsonBuilder;
Builder(String writeKey) {
if (writeKey == null || writeKey.trim().length() == 0) {
@@ -120,7 +161,7 @@ public static class Builder {
}
/** Set a custom networking client. */
- public Builder client(Client client) {
+ public Builder client(OkHttpClient client) {
if (client == null) {
throw new NullPointerException("Null client");
}
@@ -138,14 +179,26 @@ public Builder log(Log log) {
}
/**
- * Set an endpoint that this client should upload events to. Uses {@code https://api.segment.io}
- * by default.
+ * Set an endpoint (host only) that this client should upload events to. Uses {@code
+ * https://api.segment.io} by default.
*/
public Builder endpoint(String endpoint) {
if (endpoint == null || endpoint.trim().length() == 0) {
throw new NullPointerException("endpoint cannot be null or empty.");
}
- this.endpoint = Endpoints.newFixedEndpoint(endpoint);
+ this.endpoint = HttpUrl.parse(endpoint + DEFAULT_PATH);
+ return this;
+ }
+
+ /**
+ * Set an endpoint (host and prefix) that this client should upload events to. Uses {@code
+ * https://api.segment.io/v1} by default.
+ */
+ public Builder setUploadURL(String uploadURL) {
+ if (uploadURL == null || uploadURL.trim().length() == 0) {
+ throw new NullPointerException("Upload URL cannot be null or empty.");
+ }
+ this.uploadURL = HttpUrl.parse(uploadURL);
return this;
}
@@ -190,6 +243,28 @@ public Builder messageInterceptor(MessageInterceptor interceptor) {
return this;
}
+ /** Set queue capacity */
+ public Builder queueCapacity(int capacity) {
+ if (capacity <= 0) {
+ throw new IllegalArgumentException("capacity should be positive.");
+ }
+ this.queueCapacity = capacity;
+ return this;
+ }
+
+ public Builder gsonBuilder(GsonBuilder gsonBuilder) {
+ if (gsonBuilder == null) {
+ throw new NullPointerException("Null gsonBuilder");
+ }
+
+ if (this.gsonBuilder != null) {
+ throw new IllegalStateException("gsonBuilder is already registered.");
+ }
+
+ this.gsonBuilder = gsonBuilder;
+ return this;
+ }
+
/** Set the queueSize at which flushes should be triggered. */
@Beta
public Builder flushQueueSize(int flushQueueSize) {
@@ -200,6 +275,17 @@ public Builder flushQueueSize(int flushQueueSize) {
return this;
}
+ /** Set the queueSize at which flushes should be triggered. */
+ @Beta
+ public Builder maximumQueueSizeInBytes(int bytes) {
+ if (bytes < 1) {
+ throw new IllegalArgumentException("maximumQueueSizeInBytes must not be less than 1.");
+ }
+
+ this.maximumQueueSizeInBytes = bytes;
+ return this;
+ }
+
/** Set the interval at which the queue should be flushed. */
@Beta
public Builder flushInterval(long flushInterval, TimeUnit unit) {
@@ -211,6 +297,15 @@ public Builder flushInterval(long flushInterval, TimeUnit unit) {
return this;
}
+ /** Set how many retries should happen before getting exhausted */
+ public Builder retries(int maximumRetries) {
+ if (maximumRetries < 1) {
+ throw new IllegalArgumentException("retries must be at least 1");
+ }
+ this.maximumFlushAttempts = maximumRetries;
+ return this;
+ }
+
/** Set the {@link ExecutorService} on which all HTTP requests will be made. */
public Builder networkExecutor(ExecutorService networkExecutor) {
if (networkExecutor == null) {
@@ -255,29 +350,55 @@ public Builder plugin(Plugin plugin) {
return this;
}
+ /** Specify if need TlsV1 */
+ public Builder forceTlsVersion1() {
+ forceTlsV1 = true;
+ return this;
+ }
+
/** Create a {@link Analytics} client. */
public Analytics build() {
- Gson gson =
- new GsonBuilder() //
- .registerTypeAdapterFactory(new AutoValueAdapterFactory()) //
- .registerTypeAdapter(Date.class, new ISO8601DateAdapter()) //
- .create();
+ if (gsonBuilder == null) {
+ gsonBuilder = new GsonBuilder();
+ }
+
+ gsonBuilder
+ .registerTypeAdapterFactory(new AutoValueAdapterFactory())
+ .registerTypeAdapter(Instant.class, new ISO8601InstantAdapter())
+ .registerTypeAdapter(Date.class, new ISO8601DateAdapter());
+
+ Gson gson = gsonBuilder.create();
if (endpoint == null) {
- endpoint = DEFAULT_ENDPOINT;
+ if (uploadURL != null) {
+ endpoint = uploadURL;
+ } else {
+ endpoint = HttpUrl.parse(DEFAULT_ENDPOINT + DEFAULT_PATH);
+ }
}
+
if (client == null) {
client = Platform.get().defaultClient();
}
+
if (log == null) {
log = Log.NONE;
}
if (flushIntervalInMillis == 0) {
flushIntervalInMillis = Platform.get().defaultFlushIntervalInMillis();
}
+ if (queueCapacity == 0) {
+ queueCapacity = Integer.MAX_VALUE;
+ }
if (flushQueueSize == 0) {
flushQueueSize = Platform.get().defaultFlushQueueSize();
}
+ if (maximumQueueSizeInBytes == 0) {
+ maximumQueueSizeInBytes = MESSAGE_QUEUE_MAX_BYTE_SIZE;
+ }
+ if (maximumFlushAttempts == 0) {
+ maximumFlushAttempts = 3;
+ }
if (messageTransformers == null) {
messageTransformers = Collections.emptyList();
} else {
@@ -300,33 +421,60 @@ public Analytics build() {
callbacks = Collections.unmodifiableList(callbacks);
}
- RestAdapter restAdapter =
- new RestAdapter.Builder()
- .setConverter(new GsonConverter(gson))
- .setEndpoint(endpoint)
- .setClient(client)
- .setRequestInterceptor(new AnalyticsRequestInterceptor(writeKey, userAgent))
- .setLogLevel(RestAdapter.LogLevel.FULL)
- .setLog(
- new RestAdapter.Log() {
- @Override
- public void log(String message) {
- log.print(Log.Level.VERBOSE, "%s", message);
- }
- })
+ HttpLoggingInterceptor interceptor =
+ new HttpLoggingInterceptor(
+ new HttpLoggingInterceptor.Logger() {
+ @Override
+ public void log(String message) {
+ log.print(Log.Level.VERBOSE, "%s", message);
+ }
+ });
+
+ interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
+
+ OkHttpClient.Builder builder =
+ client
+ .newBuilder()
+ .addInterceptor(new AnalyticsRequestInterceptor(userAgent))
+ .addInterceptor(interceptor);
+
+ if (forceTlsV1) {
+ ConnectionSpec connectionSpec =
+ new ConnectionSpec.Builder(ConnectionSpec.MODERN_TLS)
+ .tlsVersions(
+ TlsVersion.TLS_1_0, TlsVersion.TLS_1_1, TlsVersion.TLS_1_2, TlsVersion.TLS_1_3)
+ .build();
+
+ builder = builder.connectionSpecs(Arrays.asList(connectionSpec));
+ }
+
+ client = builder.build();
+
+ Retrofit restAdapter =
+ new Retrofit.Builder()
+ .addConverterFactory(GsonConverterFactory.create(gson))
+ .baseUrl(DEFAULT_ENDPOINT)
+ .client(client)
.build();
SegmentService segmentService = restAdapter.create(SegmentService.class);
AnalyticsClient analyticsClient =
AnalyticsClient.create(
+ endpoint,
segmentService,
+ queueCapacity,
flushQueueSize,
flushIntervalInMillis,
+ maximumFlushAttempts,
+ maximumQueueSizeInBytes,
log,
threadFactory,
networkExecutor,
- callbacks);
+ callbacks,
+ writeKey,
+ gson);
+
return new Analytics(analyticsClient, messageTransformers, messageInterceptors, log);
}
}
diff --git a/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java b/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java
index 8148fba5..ff50fc3b 100644
--- a/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java
+++ b/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java
@@ -1,24 +1,24 @@
package com.segment.analytics;
-import javax.annotation.Nonnull;
-import okhttp3.Credentials;
-import retrofit.RequestInterceptor;
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import okhttp3.Interceptor;
+import okhttp3.Request;
-class AnalyticsRequestInterceptor implements RequestInterceptor {
- private static final String AUTHORIZATION_HEADER = "Authorization";
+class AnalyticsRequestInterceptor implements Interceptor {
private static final String USER_AGENT_HEADER = "User-Agent";
- private final @Nonnull String writeKey;
private final @Nonnull String userAgent;
- AnalyticsRequestInterceptor(@Nonnull String writeKey, @Nonnull String userAgent) {
- this.writeKey = writeKey;
+ AnalyticsRequestInterceptor(@Nonnull String userAgent) {
this.userAgent = userAgent;
}
@Override
- public void intercept(RequestFacade request) {
- request.addHeader(AUTHORIZATION_HEADER, Credentials.basic(writeKey, ""));
- request.addHeader(USER_AGENT_HEADER, userAgent);
+ public okhttp3.Response intercept(Chain chain) throws IOException {
+ Request request = chain.request();
+ Request newRequest = request.newBuilder().addHeader(USER_AGENT_HEADER, userAgent).build();
+
+ return chain.proceed(newRequest);
}
}
diff --git a/analytics/src/main/java/com/segment/analytics/Platform.java b/analytics/src/main/java/com/segment/analytics/Platform.java
index 2da0e9da..a5de5cb5 100644
--- a/analytics/src/main/java/com/segment/analytics/Platform.java
+++ b/analytics/src/main/java/com/segment/analytics/Platform.java
@@ -2,13 +2,11 @@
import static java.lang.Thread.MIN_PRIORITY;
-import com.jakewharton.retrofit.Ok3Client;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
-import retrofit.client.Client;
class Platform {
static final String THREAD_NAME = "Analytics";
@@ -23,14 +21,14 @@ private static Platform findPlatform() {
return new Platform();
}
- Client defaultClient() {
+ OkHttpClient defaultClient() {
OkHttpClient client =
new OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.build();
- return new Ok3Client(client);
+ return client;
}
ExecutorService defaultNetworkExecutor() {
diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java
index 80df17ac..2430cd1e 100644
--- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java
+++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java
@@ -4,29 +4,48 @@
import static com.segment.analytics.Log.Level.ERROR;
import static com.segment.analytics.Log.Level.VERBOSE;
+import com.google.gson.Gson;
import com.segment.analytics.Callback;
import com.segment.analytics.Log;
import com.segment.analytics.http.SegmentService;
+import com.segment.analytics.http.UploadResponse;
import com.segment.analytics.messages.Batch;
import com.segment.analytics.messages.Message;
import com.segment.backo.Backo;
import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import retrofit.RetrofitError;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import okhttp3.HttpUrl;
+import retrofit2.Call;
+import retrofit2.Response;
public class AnalyticsClient {
private static final Map CONTEXT;
+ private static final int BATCH_MAX_SIZE = 1024 * 500;
+ private static final int MSG_MAX_SIZE = 1024 * 32;
+ private static final Charset ENCODING = StandardCharsets.UTF_8;
+ private Gson gsonInstance;
+ private static final String instanceId = UUID.randomUUID().toString();
+ private static final int WAIT_FOR_THREAD_COMPLETE_S = 5;
+ private static final int TERMINATION_TIMEOUT_S = 1;
static {
Map library = new LinkedHashMap<>();
@@ -34,55 +53,91 @@ public class AnalyticsClient {
library.put("version", AnalyticsVersion.get());
Map context = new LinkedHashMap<>();
context.put("library", Collections.unmodifiableMap(library));
+ context.put("instanceId", instanceId);
CONTEXT = Collections.unmodifiableMap(context);
}
private final BlockingQueue messageQueue;
+ private final HttpUrl uploadUrl;
private final SegmentService service;
private final int size;
+ private final int maximumRetries;
+ private final int maximumQueueByteSize;
+ private int currentQueueSizeInBytes;
private final Log log;
private final List callbacks;
private final ExecutorService networkExecutor;
private final ExecutorService looperExecutor;
private final ScheduledExecutorService flushScheduler;
+ private final AtomicBoolean isShutDown;
+ private final String writeKey;
+ private volatile Future> looperFuture;
public static AnalyticsClient create(
+ HttpUrl uploadUrl,
SegmentService segmentService,
+ int queueCapacity,
int flushQueueSize,
long flushIntervalInMillis,
+ int maximumRetries,
+ int maximumQueueSizeInBytes,
Log log,
ThreadFactory threadFactory,
ExecutorService networkExecutor,
- List callbacks) {
+ List callbacks,
+ String writeKey,
+ Gson gsonInstance) {
return new AnalyticsClient(
- new LinkedBlockingQueue(),
+ new LinkedBlockingQueue(queueCapacity),
+ uploadUrl,
segmentService,
flushQueueSize,
flushIntervalInMillis,
+ maximumRetries,
+ maximumQueueSizeInBytes,
log,
threadFactory,
networkExecutor,
- callbacks);
+ callbacks,
+ new AtomicBoolean(false),
+ writeKey,
+ gsonInstance);
}
- AnalyticsClient(
+ public AnalyticsClient(
BlockingQueue messageQueue,
+ HttpUrl uploadUrl,
SegmentService service,
int maxQueueSize,
long flushIntervalInMillis,
+ int maximumRetries,
+ int maximumQueueSizeInBytes,
Log log,
ThreadFactory threadFactory,
ExecutorService networkExecutor,
- List callbacks) {
+ List callbacks,
+ AtomicBoolean isShutDown,
+ String writeKey,
+ Gson gsonInstance) {
this.messageQueue = messageQueue;
+ this.uploadUrl = uploadUrl;
this.service = service;
this.size = maxQueueSize;
+ this.maximumRetries = maximumRetries;
+ this.maximumQueueByteSize = maximumQueueSizeInBytes;
this.log = log;
this.callbacks = callbacks;
this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory);
this.networkExecutor = networkExecutor;
+ this.isShutDown = isShutDown;
+ this.writeKey = writeKey;
+ this.gsonInstance = gsonInstance;
- looperExecutor.submit(new Looper());
+ this.currentQueueSizeInBytes = 0;
+
+ if (!isShutDown.get()) {
+ this.looperFuture = looperExecutor.submit(new Looper());
+ }
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
flushScheduler.scheduleAtFixedRate(
@@ -97,23 +152,165 @@ public void run() {
TimeUnit.MILLISECONDS);
}
+ public int messageSizeInBytes(Message message) {
+ String stringifiedMessage = gsonInstance.toJson(message);
+
+ return stringifiedMessage.getBytes(ENCODING).length;
+ }
+
+ private Boolean isBackPressuredAfterSize(int incomingSize) {
+ int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON);
+ int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE;
+ // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time
+ return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9;
+ }
+
+ public boolean offer(Message message) {
+ return messageQueue.offer(message);
+ }
+
public void enqueue(Message message) {
+ if (message != StopMessage.STOP && isShutDown.get()) {
+ log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message);
+ return;
+ }
+
try {
- messageQueue.put(message);
+ // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its
+ // valid message
+ if (message != StopMessage.STOP && message != FlushMessage.POISON) {
+ int messageByteSize = messageSizeInBytes(message);
+
+ // @jorgen25 check if message is below 32kb limit for individual messages, no need to check
+ // for extra characters
+ if (messageByteSize <= MSG_MAX_SIZE) {
+ if (isBackPressuredAfterSize(messageByteSize)) {
+ this.currentQueueSizeInBytes = messageByteSize;
+ messageQueue.put(FlushMessage.POISON);
+ messageQueue.put(message);
+
+ log.print(VERBOSE, "Maximum storage size has been hit Flushing...");
+ } else {
+ messageQueue.put(message);
+ this.currentQueueSizeInBytes += messageByteSize;
+ }
+ } else {
+ log.print(
+ ERROR, "Message was above individual limit. MessageId: %s", message.messageId());
+ throw new IllegalArgumentException(
+ "Message was above individual limit. MessageId: " + message.messageId());
+ }
+ } else {
+ messageQueue.put(message);
+ }
} catch (InterruptedException e) {
log.print(ERROR, e, "Interrupted while adding message %s.", message);
+ Thread.currentThread().interrupt();
}
}
public void flush() {
- enqueue(FlushMessage.POISON);
+ if (!isShutDown.get()) {
+ enqueue(FlushMessage.POISON);
+ }
}
public void shutdown() {
- messageQueue.clear();
- looperExecutor.shutdownNow();
- flushScheduler.shutdownNow();
- networkExecutor.shutdown(); // Let in-flight requests complete.
+ if (isShutDown.compareAndSet(false, true)) {
+ final long start = System.currentTimeMillis();
+
+ // first let's tell the system to stop
+ enqueue(StopMessage.STOP);
+
+ // we can shutdown the flush scheduler without worrying
+ flushScheduler.shutdownNow();
+
+ // Wait for the looper to complete processing before shutting down executors
+ waitForLooperCompletion();
+ shutdownAndWait(looperExecutor, "looper");
+ shutdownAndWait(networkExecutor, "network");
+
+ log.print(
+ VERBOSE, "Analytics client shut down in %s ms", (System.currentTimeMillis() - start));
+ }
+ }
+
+ /**
+ * Wait for the looper to complete processing all messages before proceeding with shutdown. This
+ * prevents the race condition where the network executor is shut down before the looper finishes
+ * submitting all batches.
+ */
+ private void waitForLooperCompletion() {
+ if (looperFuture != null) {
+ try {
+ // Wait for the looper to complete processing the STOP message and finish
+ // Use a reasonable timeout to avoid hanging indefinitely
+ looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS);
+ log.print(VERBOSE, "Looper completed successfully.");
+ } catch (Exception e) {
+ log.print(ERROR, e, "Error waiting for looper to complete.");
+ // Cancel the looper if it's taking too long or if there's an error
+ if (!looperFuture.isDone()) {
+ looperFuture.cancel(true);
+ log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
+ }
+ }
+ }
+ }
+
+ public void shutdownAndWait(ExecutorService executor, String name) {
+ boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper");
+ try {
+ executor.shutdown();
+ boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
+ if (terminated) {
+ log.print(VERBOSE, "%s executor terminated normally.", name);
+ return;
+ }
+ if (isLooperExecutor) { // Handle looper - network should finish on its own
+ // not terminated within timeout -> force shutdown
+ log.print(
+ VERBOSE,
+ "%s did not terminate in %d seconds; requesting shutdownNow().",
+ name,
+ TERMINATION_TIMEOUT_S);
+ List dropped = executor.shutdownNow(); // interrupts running tasks
+ log.print(
+ VERBOSE,
+ "%s shutdownNow returned %d queued tasks that never started.",
+ name,
+ dropped.size());
+
+ // optional short wait to give interrupted tasks a chance to exit
+ boolean terminatedAfterForce =
+ executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
+ log.print(
+ VERBOSE,
+ "%s executor %s after shutdownNow().",
+ name,
+ terminatedAfterForce ? "terminated" : "still running (did not terminate)");
+
+ if (!terminatedAfterForce) {
+ // final warning โ investigate tasks that ignore interrupts
+ log.print(
+ ERROR,
+ "%s executor still did not terminate; tasks may be ignoring interrupts.",
+ name);
+ }
+ }
+ } catch (InterruptedException e) {
+ // Preserve interrupt status and attempt forceful shutdown
+ log.print(ERROR, e, "Interrupted while stopping %s executor.", name);
+ Thread.currentThread().interrupt();
+ if (isLooperExecutor) {
+ List dropped = executor.shutdownNow();
+ log.print(
+ VERBOSE,
+ "%s shutdownNow invoked after interrupt; %d tasks returned.",
+ name,
+ dropped.size());
+ }
+ }
}
/**
@@ -121,35 +318,91 @@ public void shutdown() {
* messages, it triggers a flush.
*/
class Looper implements Runnable {
+ private boolean stop;
+
+ public Looper() {
+ this.stop = false;
+ }
+
@Override
public void run() {
- List messages = new ArrayList<>();
+ LinkedList messages = new LinkedList<>();
+ AtomicInteger currentBatchSize = new AtomicInteger();
+ boolean batchSizeLimitReached = false;
+ int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length;
try {
- //noinspection InfiniteLoopStatement
- while (true) {
+ while (!stop) {
Message message = messageQueue.take();
- if (message != FlushMessage.POISON) {
- messages.add(message);
- } else if (messages.size() < 1) {
- log.print(VERBOSE, "No messages to flush.");
- continue;
+ if (message == StopMessage.STOP) {
+ log.print(VERBOSE, "Stopping the Looper");
+ stop = true;
+ } else if (message == FlushMessage.POISON) {
+ if (!messages.isEmpty()) {
+ log.print(VERBOSE, "Flushing messages.");
+ }
+ } else {
+ // we do +1 because we are accounting for this new message we just took from the queue
+ // which is not in list yet
+ // need to check if this message is going to make us go over the limit considering
+ // default batch size as well
+ int defaultBatchSize =
+ BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1);
+ int msgSize = messageSizeInBytes(message);
+ if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) {
+ messages.add(message);
+ currentBatchSize.addAndGet(msgSize);
+ } else {
+ // put message that did not make the cut this time back on the queue, we already took
+ // this message if we dont put it back its lost
+ // we take care of that after submitting the batch
+ batchSizeLimitReached = true;
+ }
}
- if (messages.size() >= size || message == FlushMessage.POISON) {
- Batch batch = Batch.create(CONTEXT, messages);
+ Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP;
+ Boolean isOverflow = messages.size() >= size;
+
+ if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) {
+ Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey);
log.print(
VERBOSE,
"Batching %s message(s) into batch %s.",
- messages.size(),
+ batch.batch().size(),
batch.sequence());
- networkExecutor.submit(BatchUploadTask.create(AnalyticsClient.this, batch));
- messages = new ArrayList<>();
+ try {
+ networkExecutor.submit(
+ BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
+ } catch (RejectedExecutionException e) {
+ log.print(
+ ERROR,
+ e,
+ "Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
+ batch.sequence());
+ // Notify callbacks about the failure
+ for (Message msg : batch.batch()) {
+ for (Callback callback : callbacks) {
+ callback.failure(msg, e);
+ }
+ }
+ }
+
+ currentBatchSize.set(0);
+ messages.clear();
+ if (batchSizeLimitReached) {
+ // If this is true that means the last message that would make us go over the limit
+ // was not added,
+ // add it to the now cleared messages list so its not lost
+ messages.add(message);
+ }
+ batchSizeLimitReached = false;
}
}
} catch (InterruptedException e) {
log.print(DEBUG, "Looper interrupted while polling for messages.");
+ Thread.currentThread().interrupt();
}
+ log.print(VERBOSE, "Looper stopped");
}
}
@@ -160,89 +413,83 @@ static class BatchUploadTask implements Runnable {
.cap(TimeUnit.HOURS, 1) //
.jitter(1) //
.build();
- private static final int MAX_ATTEMPTS = 50; // Max 50 hours ~ 2 days
private final AnalyticsClient client;
private final Backo backo;
final Batch batch;
+ private final int maxRetries;
- static BatchUploadTask create(AnalyticsClient client, Batch batch) {
- return new BatchUploadTask(client, BACKO, batch);
+ static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) {
+ return new BatchUploadTask(client, BACKO, batch, maxRetries);
}
- BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch) {
+ BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) {
this.client = client;
this.batch = batch;
this.backo = backo;
+ this.maxRetries = maxRetries;
+ }
+
+ private void notifyCallbacksWithException(Batch batch, Exception exception) {
+ for (Message message : batch.batch()) {
+ for (Callback callback : client.callbacks) {
+ callback.failure(message, exception);
+ }
+ }
}
/** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */
boolean upload() {
+ client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence());
+
try {
- client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence());
+ Call call = client.service.upload(client.uploadUrl, batch);
+ Response response = call.execute();
- // Ignore return value, UploadResponse#onSuccess will never return false for 200 OK
- client.service.upload(batch);
+ if (response.isSuccessful()) {
+ client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence());
- client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence());
- for (Message message : batch.batch()) {
- for (Callback callback : client.callbacks) {
- callback.success(message);
+ for (Message message : batch.batch()) {
+ for (Callback callback : client.callbacks) {
+ callback.success(message);
+ }
}
+
+ return false;
}
- return false;
- } catch (RetrofitError error) {
- switch (error.getKind()) {
- case NETWORK:
- client.log.print(
- DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence());
- return true;
- case HTTP:
- // Retry 5xx and 429 responses.
- int status = error.getResponse().getStatus();
- if (is5xx(status)) {
- client.log.print(
- DEBUG,
- error,
- "Could not upload batch %s due to server error. Retrying.",
- batch.sequence());
- return true;
- }
- if (status == 429) {
- client.log.print(
- DEBUG,
- error,
- "Could not upload batch %s due to rate limiting. Retrying.",
- batch.sequence());
- return true;
- }
- client.log.print(
- ERROR,
- error,
- "Could not upload batch %s due to HTTP error. Giving up.",
- batch.sequence());
- for (Message message : batch.batch()) {
- for (Callback callback : client.callbacks) {
- callback.failure(message, error);
- }
- }
- return false; // Don't retry
- default:
- client.log.print(
- ERROR, error, "Could not upload batch %s. Giving up.", batch.sequence());
- for (Message message : batch.batch()) {
- for (Callback callback : client.callbacks) {
- callback.failure(message, error);
- }
- }
- return false; // Don't retry
+
+ int status = response.code();
+ if (is5xx(status)) {
+ client.log.print(
+ DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence());
+ return true;
+ } else if (status == 429) {
+ client.log.print(
+ DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence());
+ return true;
}
+
+ client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence());
+ notifyCallbacksWithException(batch, new IOException(response.errorBody().string()));
+
+ return false;
+ } catch (IOException error) {
+ client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence());
+
+ return true;
+ } catch (Exception exception) {
+ client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence());
+
+ notifyCallbacksWithException(batch, exception);
+
+ return false;
}
}
@Override
public void run() {
- for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+ int attempt = 0;
+ for (; attempt <= maxRetries; attempt++) {
boolean retry = upload();
if (!retry) return;
try {
@@ -255,16 +502,65 @@ public void run() {
}
client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence());
- IOException exception = new IOException(MAX_ATTEMPTS + " retries exhausted");
- for (Message message : batch.batch()) {
- for (Callback callback : client.callbacks) {
- callback.failure(message, exception);
- }
- }
+ notifyCallbacksWithException(
+ batch, new IOException(Integer.toString(attempt) + " retries exhausted"));
}
private static boolean is5xx(int status) {
return status >= 500 && status < 600;
}
}
+
+ public static class BatchUtility {
+
+ /**
+ * Method to determine what is the expected default size of the batch regardless of messages
+ *
+ * Sample batch:
+ * {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov
+ * 18, 2021, 2:45:07
+ * PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
+ * "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07
+ * PM","userId":"jorgen25",
+ * "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
+ * "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM",
+ * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
+ * "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM",
+ * "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}],
+ * "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java",
+ * "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
+ *
+ *
total size of batch : 932
+ *
+ *
BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss
+ * tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
+ *
+ *
so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in
+ * this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) +
+ * extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the
+ * same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the
+ * sequence digit which we account for in point 5) 4 -Commas between each message, the total
+ * number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour
+ * in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments
+ * with every batch created
+ *
+ *
so formulae to determine the expected default size of the batch is
+ *
+ * @return: defaultSize = messages size + context size + metadata size + comma number + sequence
+ * digits + writekey + buffer
+ * @return
+ */
+ private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) {
+ // sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1,
+ // "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119
+ // Don't need to squeeze everything possible into a batch, adding a buffer
+ int metadataExtraCharsSize = 119 + 1024;
+ int commaNumber = currentMessageNumber - 1;
+
+ return contextSize
+ + metadataExtraCharsSize
+ + commaNumber
+ + String.valueOf(Integer.MAX_VALUE).length();
+ }
+ }
}
diff --git a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java b/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java
index 5c89bb7b..aa3d2689 100644
--- a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java
+++ b/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java
@@ -1,10 +1,10 @@
package com.segment.analytics.internal;
import com.segment.analytics.messages.Message;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
import java.util.Date;
import java.util.Map;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
class FlushMessage implements Message {
static final FlushMessage POISON = new FlushMessage();
@@ -23,6 +23,12 @@ public String messageId() {
throw new UnsupportedOperationException();
}
+ @Nullable
+ @Override
+ public Date sentAt() {
+ throw new UnsupportedOperationException();
+ }
+
@Nonnull
@Override
public Date timestamp() {
diff --git a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java b/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java
new file mode 100644
index 00000000..ad8e1352
--- /dev/null
+++ b/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java
@@ -0,0 +1,66 @@
+package com.segment.analytics.internal;
+
+import com.segment.analytics.messages.Message;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.Date;
+import java.util.Map;
+
+class StopMessage implements Message {
+ static final StopMessage STOP = new StopMessage();
+
+ private StopMessage() {}
+
+ @Nonnull
+ @Override
+ public Type type() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nonnull
+ @Override
+ public String messageId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Date sentAt() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nonnull
+ @Override
+ public Date timestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Map context() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public String anonymousId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public String userId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ @Override
+ public Map integrations() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return "StopMessage{}";
+ }
+}
diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java
index e9f99bc1..31596e90 100644
--- a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java
+++ b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java
@@ -2,9 +2,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import com.google.gson.GsonBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -94,6 +97,23 @@ public void nullLog() {
}
}
+ @Test
+ public void invalidRetryCount() {
+ try {
+ builder.retries(0);
+ fail("Should fail for retries less than 1");
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessage("retries must be at least 1");
+ }
+
+ try {
+ builder.retries(-1);
+ fail("Should fail for retries less than 1");
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessage("retries must be at least 1");
+ }
+ }
+
@Test
public void nullTransformer() {
try {
@@ -148,6 +168,34 @@ public void buildsWithValidInterceptor() {
assertThat(analytics).isNotNull();
}
+ @Test
+ public void nullGsonBuilder() {
+ try {
+ builder.gsonBuilder(null);
+ fail("Should fail for null gsonBuilder");
+ } catch (NullPointerException e) {
+ assertThat(e).hasMessage("Null gsonBuilder");
+ }
+ }
+
+ @Test
+ public void duplicateGsonBuilder() {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ try {
+ builder.gsonBuilder(gsonBuilder).gsonBuilder(gsonBuilder);
+ fail("Should fail for duplicate gsonBuilder");
+ } catch (IllegalStateException e) {
+ assertThat(e).hasMessage("gsonBuilder is already registered.");
+ }
+ }
+
+ @Test
+ public void buildsWithValidGsonBuilder() {
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ Analytics analytics = builder.gsonBuilder(gsonBuilder).build();
+ assertThat(analytics).isNotNull();
+ }
+
@Test
public void invalidFlushQueueSize() {
try {
@@ -251,6 +299,72 @@ public void buildsWithValidEndpoint() {
assertThat(analytics).isNotNull();
}
+ @Test
+ public void buildsCorrectEndpoint() {
+ builder.endpoint("https://api.segment.io");
+ String expectedURL = "https://api.segment.io/v1/import/";
+ assertEquals(expectedURL, builder.endpoint.toString());
+ }
+
+ @Test
+ public void buildsWithValidUploadURL() {
+ Analytics analytics = builder.setUploadURL("https://example.com/v2/batch/").build();
+ assertThat(analytics).isNotNull();
+ }
+
+ @Test
+ public void buildsCorrectEndpointWithUploadURL() {
+ builder.setUploadURL("https://dummy.url/api/v1/segment/").build();
+ String expectedURL = "https://dummy.url/api/v1/segment/";
+ assertEquals(expectedURL, builder.endpoint.toString());
+ }
+
+ @Test
+ public void shouldPrioritizeUploadURLOverEndpoint() {
+ builder
+ .endpoint("this wont be set anyway")
+ .setUploadURL("https://dummy.url/api/v1/segment/")
+ .build();
+ String expectedURL = "https://dummy.url/api/v1/segment/";
+
+ assertEquals(expectedURL, builder.uploadURL.toString());
+ assertNotEquals("this wont be set anyway", builder.endpoint.toString());
+ }
+
+ @Test
+ public void buildsCorrectURLWithUploadURL() {
+ builder.setUploadURL("https://example.com/v2/batch/").build();
+ String expectedURL = "https://example.com/v2/batch/";
+ assertEquals(expectedURL, builder.uploadURL.toString());
+ }
+
+ @Test
+ public void nullHostAndPrefixEndpoint() {
+ try {
+ builder.setUploadURL(null);
+ fail("Should fail for null endpoint");
+ } catch (NullPointerException e) {
+ assertThat(e).hasMessage("Upload URL cannot be null or empty.");
+ }
+ }
+
+ @Test
+ public void emptyHostAndPrefixEndpoint() {
+ try {
+ builder.setUploadURL("");
+ fail("Should fail for empty endpoint");
+ } catch (NullPointerException e) {
+ assertThat(e).hasMessage("Upload URL cannot be null or empty.");
+ }
+
+ try {
+ builder.setUploadURL(" ");
+ fail("Should fail for empty endpoint");
+ } catch (NullPointerException e) {
+ assertThat(e).hasMessage("Upload URL cannot be null or empty.");
+ }
+ }
+
@Test
public void nullThreadFactory() {
try {
@@ -293,6 +407,12 @@ public void buildsWithValidCallback() {
assertThat(analytics).isNotNull();
}
+ @Test
+ public void buildsWithForceTlsV1() {
+ Analytics analytics = builder.forceTlsVersion1().build();
+ assertThat(analytics).isNotNull();
+ }
+
@Test
public void multipleCallbacks() {
Analytics analytics =
@@ -317,4 +437,27 @@ public void pluginCanConfigure() {
builder.plugin(plugin);
verify(plugin).configure(builder);
}
+
+ @Test
+ public void invalidQueueCapacity() {
+ try {
+ builder.queueCapacity(0);
+ fail("Should fail when queue capacity is 0");
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessage("capacity should be positive.");
+ }
+
+ try {
+ builder.queueCapacity(-1);
+ fail("Should fail when queue capacity is -1");
+ } catch (IllegalArgumentException e) {
+ assertThat(e).hasMessage("capacity should be positive.");
+ }
+ }
+
+ @Test
+ public void buildWithQueueCapacity() {
+ Analytics analytics = builder.queueCapacity(10).build();
+ assertThat(analytics).isNotNull();
+ }
}
diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java
index 2864541d..5741b6e0 100644
--- a/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java
+++ b/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java
@@ -1,22 +1,34 @@
package com.segment.analytics;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertThat;
-import com.segment.analytics.messages.*;
+import java.io.IOException;
+import okhttp3.Connection;
+import okhttp3.Interceptor.Chain;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.hamcrest.core.Is;
import org.junit.Test;
-import retrofit.RequestInterceptor.RequestFacade;
+import org.mockito.Mock;
public class AnalyticsRequestInterceptorTest {
+ @Mock private Connection mockConnection;
+
@Test
- public void interceptor() {
- RequestFacade requestFacade = mock(RequestFacade.class);
- AnalyticsRequestInterceptor interceptor =
- new AnalyticsRequestInterceptor("writeKey", "userAgent");
+ public void testInterceptor() throws IOException {
+ AnalyticsRequestInterceptor interceptor = new AnalyticsRequestInterceptor("userAgent");
+
+ final Request request = new Request.Builder().url("https://api.segment.io").get().build();
- interceptor.intercept(requestFacade);
+ Chain chain =
+ new ChainAdapter(request, mockConnection) {
+ @Override
+ public Response proceed(Request request) throws IOException {
+ assertThat(request.header("User-Agent"), Is.is("userAgent"));
+ return null;
+ }
+ };
- verify(requestFacade).addHeader("Authorization", "Basic d3JpdGVLZXk6");
- verify(requestFacade).addHeader("User-Agent", "userAgent");
+ interceptor.intercept(chain);
}
}
diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java
index e0dbfdbf..8be3012e 100644
--- a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java
+++ b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java
@@ -1,7 +1,9 @@
package com.segment.analytics;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
@@ -11,7 +13,13 @@
import com.segment.analytics.messages.Message;
import com.segment.analytics.messages.MessageBuilder;
import com.squareup.burst.BurstJUnit4;
+import java.lang.reflect.Field;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -88,4 +96,55 @@ public void flushIsDispatched() {
verify(client).flush();
}
+
+ @Test
+ public void offerIsDispatched(MessageBuilderTest builder) {
+ MessageBuilder messageBuilder = builder.get().userId("dummy");
+ Message message = messageBuilder.build();
+ when(messageTransformer.transform(messageBuilder)).thenReturn(true);
+ when(messageInterceptor.intercept(any(Message.class))).thenReturn(message);
+
+ analytics.offer(messageBuilder);
+
+ verify(messageTransformer).transform(messageBuilder);
+ verify(messageInterceptor).intercept(any(Message.class));
+ verify(client).offer(message);
+ }
+
+ @Test
+ public void threadSafeTest(MessageBuilderTest builder)
+ throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+ // we want to test if msgs get lost during a multithreaded env
+ Analytics analytics = Analytics.builder("testWriteKeyForIssue321").build();
+ // So we just want to spy on the client of an Analytics object created normally
+ Field clientField = analytics.getClass().getDeclaredField("client");
+ clientField.setAccessible(true);
+ AnalyticsClient spy = spy((AnalyticsClient) clientField.get(analytics));
+ clientField.set(analytics, spy);
+
+ // we are going to run this test for a specific amount of seconds
+ int millisRunning = 200;
+ LocalDateTime initialTime = LocalDateTime.now();
+ LocalDateTime now;
+
+ // and a set number of threads will be using the library
+ ExecutorService service = Executors.newFixedThreadPool(20);
+ AtomicInteger counter = new AtomicInteger();
+
+ MessageBuilder messageBuilder = builder.get().userId("jorgen25");
+
+ do {
+ service.submit(
+ () -> {
+ analytics.enqueue(messageBuilder);
+ counter.incrementAndGet();
+ });
+ now = LocalDateTime.now();
+ } while (initialTime.until(now, ChronoUnit.MILLIS) < millisRunning);
+
+ service.shutdown();
+ while (!service.isShutdown() || !service.isTerminated()) {}
+
+ verify(spy, times(counter.get())).enqueue(any(Message.class));
+ }
}
diff --git a/analytics/src/test/java/com/segment/analytics/ChainAdapter.java b/analytics/src/test/java/com/segment/analytics/ChainAdapter.java
new file mode 100644
index 00000000..ba0a62c7
--- /dev/null
+++ b/analytics/src/test/java/com/segment/analytics/ChainAdapter.java
@@ -0,0 +1,68 @@
+package com.segment.analytics;
+
+import java.util.concurrent.TimeUnit;
+import okhttp3.Call;
+import okhttp3.Connection;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+
+/**
+ * Base class for mock interceptors.
+ *
+ * @author Patrick Bassut
+ */
+abstract class ChainAdapter implements Interceptor.Chain {
+
+ private final Connection mockConnection;
+ private final Request request;
+
+ protected ChainAdapter(Request request, Connection mockConnection) {
+ this.mockConnection = mockConnection;
+ this.request = request;
+ }
+
+ @Override
+ public Request request() {
+ return request;
+ }
+
+ @Override
+ public Connection connection() {
+ return mockConnection;
+ }
+
+ @Override
+ public Call call() {
+ return null;
+ }
+
+ @Override
+ public int connectTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public Interceptor.Chain withConnectTimeout(int timeout, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public int readTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public Interceptor.Chain withReadTimeout(int timeout, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public int writeTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public Interceptor.Chain withWriteTimeout(int timeout, TimeUnit unit) {
+ return null;
+ }
+}
diff --git a/analytics/src/test/java/com/segment/analytics/TypedInterceptorTest.java b/analytics/src/test/java/com/segment/analytics/TypedInterceptorTest.java
index 07470c18..44667460 100644
--- a/analytics/src/test/java/com/segment/analytics/TypedInterceptorTest.java
+++ b/analytics/src/test/java/com/segment/analytics/TypedInterceptorTest.java
@@ -1,7 +1,7 @@
package com.segment.analytics;
+import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import com.segment.analytics.messages.AliasMessage;
import com.segment.analytics.messages.GroupMessage;
@@ -18,26 +18,26 @@ public void messagesFanOutCorrectly() {
AliasMessage alias = AliasMessage.builder("foo").userId("bar").build();
interceptor.intercept(alias);
- verify(interceptor).alias(alias);
+ assertNull(interceptor.alias(alias));
GroupMessage group = GroupMessage.builder("foo").userId("bar").build();
interceptor.intercept(group);
- verify(interceptor).group(group);
+ assertNull(interceptor.group(group));
IdentifyMessage identify = IdentifyMessage.builder().userId("bar").build();
interceptor.intercept(identify);
- verify(interceptor).identify(identify);
+ assertNull(interceptor.identify(identify));
ScreenMessage screen = ScreenMessage.builder("foo").userId("bar").build();
interceptor.intercept(screen);
- verify(interceptor).screen(screen);
+ assertNull(interceptor.screen(screen));
PageMessage page = PageMessage.builder("foo").userId("bar").build();
interceptor.intercept(page);
- verify(interceptor).page(page);
+ assertNull(interceptor.page(page));
TrackMessage track = TrackMessage.builder("foo").userId("bar").build();
interceptor.intercept(track);
- verify(interceptor).track(track);
+ assertNull(interceptor.track(track));
}
}
diff --git a/analytics/src/test/java/com/segment/analytics/TypedTransformerTest.java b/analytics/src/test/java/com/segment/analytics/TypedTransformerTest.java
index 89aeaa83..d57ecba2 100644
--- a/analytics/src/test/java/com/segment/analytics/TypedTransformerTest.java
+++ b/analytics/src/test/java/com/segment/analytics/TypedTransformerTest.java
@@ -1,7 +1,7 @@
package com.segment.analytics;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import com.segment.analytics.messages.AliasMessage;
import com.segment.analytics.messages.GroupMessage;
@@ -18,26 +18,26 @@ public void messagesFanOutCorrectly() {
AliasMessage.Builder alias = AliasMessage.builder("foo").userId("bar");
transformer.transform(alias);
- verify(transformer).alias(alias);
+ assertFalse(transformer.alias(alias));
GroupMessage.Builder group = GroupMessage.builder("foo").userId("bar");
transformer.transform(group);
- verify(transformer).group(group);
+ assertFalse(transformer.group(group));
IdentifyMessage.Builder identify = IdentifyMessage.builder().userId("bar");
transformer.transform(identify);
- verify(transformer).identify(identify);
+ assertFalse(transformer.identify(identify));
ScreenMessage.Builder screen = ScreenMessage.builder("foo").userId("bar");
transformer.transform(screen);
- verify(transformer).screen(screen);
+ assertFalse(transformer.screen(screen));
PageMessage.Builder page = PageMessage.builder("foo").userId("bar");
transformer.transform(page);
- verify(transformer).page(page);
+ assertFalse(transformer.page(page));
TrackMessage.Builder track = TrackMessage.builder("foo").userId("bar");
transformer.transform(track);
- verify(transformer).track(track);
+ assertFalse(transformer.track(track));
}
}
diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java
index 6a5b5b88..74f04e13 100644
--- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java
+++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java
@@ -1,21 +1,27 @@
package com.segment.analytics.internal;
+import static com.segment.analytics.internal.FlushMessage.POISON;
+import static com.segment.analytics.internal.StopMessage.STOP;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.initMocks;
+import static org.mockito.MockitoAnnotations.openMocks;
+import com.google.gson.Gson;
import com.segment.analytics.Callback;
import com.segment.analytics.Log;
import com.segment.analytics.TestUtils.MessageBuilderTest;
import com.segment.analytics.http.SegmentService;
+import com.segment.analytics.http.UploadResponse;
import com.segment.analytics.internal.AnalyticsClient.BatchUploadTask;
import com.segment.analytics.messages.Batch;
import com.segment.analytics.messages.Message;
@@ -23,25 +29,32 @@
import com.segment.backo.Backo;
import com.squareup.burst.BurstJUnit4;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
+import java.util.concurrent.atomic.AtomicBoolean;
+import okhttp3.ResponseBody;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
-import retrofit.RetrofitError;
-import retrofit.client.Header;
-import retrofit.client.Response;
-import retrofit.converter.ConversionException;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import retrofit2.Call;
+import retrofit2.Response;
+import retrofit2.mock.Calls;
@RunWith(BurstJUnit4.class) //
public class AnalyticsClientTest {
@@ -49,30 +62,52 @@ public class AnalyticsClientTest {
private static final Backo BACKO =
Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build();
+ private int DEFAULT_RETRIES = 10;
+ private int MAX_BATCH_SIZE = 1024 * 500; // 500kb
+ private int MAX_MSG_SIZE = 1024 * 32; // 32kb //This is the limit for a message object
+ private int MSG_MAX_CREATE_SIZE =
+ MAX_MSG_SIZE
+ - 200; // Once we create msg object with this size it barely below 32 threshold so good
+ // for tests
+ private static String writeKey = "writeKey";
+
Log log = Log.NONE;
+
ThreadFactory threadFactory;
- @Mock BlockingQueue messageQueue;
+ @Spy LinkedBlockingQueue messageQueue;
@Mock SegmentService segmentService;
@Mock ExecutorService networkExecutor;
@Mock Callback callback;
+ @Mock UploadResponse response;
+
+ AtomicBoolean isShutDown;
@Before
public void setUp() {
- initMocks(this);
+ openMocks(this);
+
+ isShutDown = new AtomicBoolean(false);
threadFactory = Executors.defaultThreadFactory();
}
- // Defers loading the client until tests can initialize all required dependencies.
+ // Defers loading the client until tests can initialize all required
+ // dependencies.
AnalyticsClient newClient() {
return new AnalyticsClient(
messageQueue,
+ null,
segmentService,
50,
TimeUnit.HOURS.toMillis(1),
+ 0,
+ MAX_BATCH_SIZE,
log,
threadFactory,
networkExecutor,
- Collections.singletonList(callback));
+ Collections.singletonList(callback),
+ isShutDown,
+ writeKey,
+ new Gson());
}
@Test
@@ -86,13 +121,14 @@ public void enqueueAddsToQueue(MessageBuilderTest builder) throws InterruptedExc
}
@Test
- public void shutdown() {
+ public void shutdown() throws InterruptedException {
+ messageQueue = new LinkedBlockingQueue<>();
AnalyticsClient client = newClient();
client.shutdown();
- verify(messageQueue).clear();
verify(networkExecutor).shutdown();
+ verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
}
@Test
@@ -106,7 +142,7 @@ public void flushInsertsPoison() throws InterruptedException {
/** Wait until the queue is drained. */
static void wait(Queue> queue) {
- //noinspection StatementWithEmptyBody
+ // noinspection StatementWithEmptyBody
while (queue.size() > 0) {}
}
@@ -121,6 +157,47 @@ static Batch captureBatch(ExecutorService executor) {
return task.batch;
}
+ private static String generateDataOfSize(int msgSize) {
+ char[] chars = new char[msgSize];
+ Arrays.fill(chars, 'a');
+
+ return new String(chars);
+ }
+
+ private static String generateDataOfSizeSpecialChars(
+ int sizeInBytes, boolean slightlyBelowLimit) {
+ StringBuilder builder = new StringBuilder();
+ Character[] specialChars = new Character[] {'$', 'ยข', 'เคน', 'โฌ', 'ํ', 'ยฉ', 'ยถ'};
+ int currentSize = 0;
+ String smileyFace = "\uD83D\uDE01";
+ // ๐ = '\uD83D\uDE01';
+ Random rand = new Random();
+ int loopCount = 1;
+ while (currentSize < sizeInBytes) {
+ int randomNum;
+ // decide if regular/special character
+ if (loopCount > 3 && loopCount % 4 == 0) {
+ randomNum = rand.nextInt(((specialChars.length - 1) - 0) + 1) + 0;
+ builder.append(specialChars[randomNum]);
+ } else if (loopCount > 9 && loopCount % 10 == 0) {
+ builder.append(smileyFace);
+ } else {
+ // random letter from a - z
+ randomNum = rand.nextInt(('z' - 'a') + 1) + 'a';
+ builder.append((char) randomNum);
+ }
+
+ // check size so far
+ String temp = builder.toString();
+ currentSize = temp.getBytes(StandardCharsets.UTF_8).length;
+ if (slightlyBelowLimit && ((sizeInBytes - currentSize) < 500)) {
+ break;
+ }
+ loopCount++;
+ }
+ return builder.toString();
+ }
+
@Test
public void flushSubmitsToExecutor() {
messageQueue = new LinkedBlockingQueue<>();
@@ -151,6 +228,125 @@ public void enqueueMaxTriggersFlush() {
assertThat(captureBatch(networkExecutor).batch()).hasSize(50);
}
+ @Test
+ public void shouldBeAbleToCalculateMessageSize() {
+ AnalyticsClient client = newClient();
+ Map properties = new HashMap();
+
+ properties.put("property1", generateDataOfSize(1024 * 33));
+
+ TrackMessage bigMessage =
+ TrackMessage.builder("Big Event").userId("bar").properties(properties).build();
+ try {
+ client.enqueue(bigMessage);
+ } catch (IllegalArgumentException e) {
+ assertThat(e).isExactlyInstanceOf(e.getClass());
+ }
+
+ // can't test for exact size cause other attributes come in play
+ assertThat(client.messageSizeInBytes(bigMessage)).isGreaterThan(1024 * 33);
+ }
+
+ @Test
+ public void dontFlushUntilReachesMaxSize() throws InterruptedException {
+ AnalyticsClient client = newClient();
+ Map properties = new HashMap();
+
+ properties.put("property2", generateDataOfSize(MAX_BATCH_SIZE - 200));
+
+ TrackMessage bigMessage =
+ TrackMessage.builder("Big Event").userId("bar").properties(properties).build();
+ try {
+ client.enqueue(bigMessage);
+ } catch (IllegalArgumentException e) {
+ // throw new InterruptedException(e.getMessage());
+ }
+
+ wait(messageQueue);
+
+ verify(networkExecutor, never()).submit(any(Runnable.class));
+ }
+
+ /**
+ * Modified this test case since we are changing logic to NOT allow messages bigger than 32 kbs
+ * individually to be enqueued, hence had to lower the size of the generated msg here. chose
+ * MSG_MAX_CREATE_SIZE because it will generate a message just below the limit of 32 kb after it
+ * creates a Message object modified the number of events that will be created since the batch
+ * creation logic was also changed to not allow batches larger than 500 kb meaning every 15/16
+ * events the queue will be backPressured and poisoned/flushed (3 times) (purpose of test) AND
+ * there will be 4 batches submitted (15 msgs, 1 msg, 15 msg, 15 msg) so purpose of test case
+ * stands
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedException {
+ AnalyticsClient client =
+ new AnalyticsClient(
+ messageQueue,
+ null,
+ segmentService,
+ 50,
+ TimeUnit.HOURS.toMillis(1),
+ 0,
+ MAX_BATCH_SIZE * 4,
+ log,
+ threadFactory,
+ networkExecutor,
+ Collections.singletonList(callback),
+ isShutDown,
+ writeKey,
+ new Gson());
+
+ Map properties = new HashMap();
+
+ properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE));
+
+ for (int i = 0; i < 46; i++) {
+ TrackMessage bigMessage =
+ TrackMessage.builder("Big Event").userId("bar").properties(properties).build();
+ client.enqueue(bigMessage);
+ verify(messageQueue).put(bigMessage);
+ }
+
+ wait(messageQueue);
+ /**
+ * modified from expected 4 to expected 3 times, since we removed the inner loop. The inner loop
+ * was forcing to message list created from the queue to keep making batches even if its a 1
+ * message batch until the message list is empty, that was forcing the code to make one last
+ * batch of 1 msg in size bumping the number of times a batch would be submitted from 3 to 4
+ */
+ verify(networkExecutor, times(3)).submit(any(Runnable.class));
+ }
+
+ /**
+ * Had to slightly change test case since we are now modifying the logic to NOT allow messages
+ * above 32 KB in size So needed to change size of generated msg to MSG_MAX_CREATE_SIZE to keep
+ * purpose of test case intact which is to test the scenario for several messages eventually
+ * filling up the queue and flushing. Batches submitted will change from 1 to 2 because the queue
+ * will be backpressured at 16 (at this point queue is over the 500KB batch limit so its flushed
+ * and when batch is created 16 will be above 500kbs limit so it creates one batch for 15 msg and
+ * another one for the remaining single message so 500kb limit per batch is not violated
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void flushWhenMultipleMessagesReachesMaxSize() throws InterruptedException {
+ AnalyticsClient client = newClient();
+ Map properties = new HashMap();
+ properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE));
+
+ for (int i = 0; i < 16; i++) {
+ TrackMessage bigMessage =
+ TrackMessage.builder("Big Event").userId("bar").properties(properties).build();
+ client.enqueue(bigMessage);
+ }
+ wait(messageQueue);
+ client.shutdown();
+ while (!isShutDown.get()) {}
+ verify(networkExecutor, times(2)).submit(any(Runnable.class));
+ }
+
@Test
public void enqueueBeforeMaxDoesNotTriggerFlush() {
messageQueue = new LinkedBlockingQueue<>();
@@ -167,7 +363,8 @@ public void enqueueBeforeMaxDoesNotTriggerFlush() {
}
static Batch batchFor(Message message) {
- return Batch.create(Collections.emptyMap(), Collections.singletonList(message));
+ return Batch.create(
+ Collections.emptyMap(), Collections.singletonList(message), writeKey);
}
@Test
@@ -176,19 +373,21 @@ public void batchRetriesForNetworkErrors() {
TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
Batch batch = batchFor(trackMessage);
+ Response successResponse = Response.success(200, response);
+ Response failureResponse = Response.error(429, ResponseBody.create(null, ""));
+
// Throw a network error 3 times.
- RetrofitError retrofitError = RetrofitError.networkError(null, new IOException());
- when(segmentService.upload(batch))
- .thenThrow(retrofitError)
- .thenThrow(retrofitError)
- .thenThrow(retrofitError)
- .thenReturn(null);
-
- BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
+ when(segmentService.upload(null, batch))
+ .thenReturn(Calls.response(failureResponse))
+ .thenReturn(Calls.response(failureResponse))
+ .thenReturn(Calls.response(failureResponse))
+ .thenReturn(Calls.response(successResponse));
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();
// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
- verify(segmentService, times(4)).upload(batch);
+ verify(segmentService, times(4)).upload(null, batch);
verify(callback).success(trackMessage);
}
@@ -199,21 +398,21 @@ public void batchRetriesForHTTP5xxErrors() {
Batch batch = batchFor(trackMessage);
// Throw a HTTP error 3 times.
- Response response =
- new Response(
- "https://api.segment.io", 500, "Server Error", Collections.emptyList(), null);
- RetrofitError retrofitError = RetrofitError.httpError(null, response, null, null);
- when(segmentService.upload(batch))
- .thenThrow(retrofitError)
- .thenThrow(retrofitError)
- .thenThrow(retrofitError)
- .thenReturn(null);
-
- BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
+
+ Response successResponse = Response.success(200, response);
+ Response failResponse =
+ Response.error(500, ResponseBody.create(null, "Server Error"));
+ when(segmentService.upload(null, batch))
+ .thenReturn(Calls.response(failResponse))
+ .thenReturn(Calls.response(failResponse))
+ .thenReturn(Calls.response(failResponse))
+ .thenReturn(Calls.response(successResponse));
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();
// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
- verify(segmentService, times(4)).upload(batch);
+ verify(segmentService, times(4)).upload(null, batch);
verify(callback).success(trackMessage);
}
@@ -224,21 +423,20 @@ public void batchRetriesForHTTP429Errors() {
Batch batch = batchFor(trackMessage);
// Throw a HTTP error 3 times.
- Response response =
- new Response(
- "https://api.segment.io", 429, "Rate Limited", Collections.emptyList(), null);
- RetrofitError retrofitError = RetrofitError.httpError(null, response, null, null);
- when(segmentService.upload(batch))
- .thenThrow(retrofitError)
- .thenThrow(retrofitError)
- .thenThrow(retrofitError)
- .thenReturn(null);
-
- BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
+ Response successResponse = Response.success(200, response);
+ Response failResponse =
+ Response.error(429, ResponseBody.create(null, "Rate Limited"));
+ when(segmentService.upload(null, batch))
+ .thenReturn(Calls.response(failResponse))
+ .thenReturn(Calls.response(failResponse))
+ .thenReturn(Calls.response(failResponse))
+ .thenReturn(Calls.response(successResponse));
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();
// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
- verify(segmentService, times(4)).upload(batch);
+ verify(segmentService, times(4)).upload(null, batch);
verify(callback).success(trackMessage);
}
@@ -249,18 +447,16 @@ public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() {
Batch batch = batchFor(trackMessage);
// Throw a HTTP error that should not be retried.
- Response response =
- new Response(
- "https://api.segment.io", 404, "Not Found", Collections.emptyList(), null);
- RetrofitError retrofitError = RetrofitError.httpError(null, response, null, null);
- doThrow(retrofitError).when(segmentService).upload(batch);
+ Response failResponse =
+ Response.error(404, ResponseBody.create(null, "Not Found"));
+ when(segmentService.upload(null, batch)).thenReturn(Calls.response(failResponse));
- BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();
// Verify we only tried to upload once.
- verify(segmentService).upload(batch);
- verify(callback).failure(trackMessage, retrofitError);
+ verify(segmentService).upload(null, batch);
+ verify(callback).failure(eq(trackMessage), any(IOException.class));
}
@Test
@@ -268,16 +464,16 @@ public void batchDoesNotRetryForNonNetworkErrors() {
AnalyticsClient client = newClient();
TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
Batch batch = batchFor(trackMessage);
- RetrofitError retrofitError =
- RetrofitError.conversionError(null, null, null, null, new ConversionException("fake"));
- doThrow(retrofitError).when(segmentService).upload(batch);
- BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
+ Call networkFailure = Calls.failure(new RuntimeException());
+ when(segmentService.upload(null, batch)).thenReturn(networkFailure);
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();
// Verify we only tried to upload once.
- verify(segmentService).upload(batch);
- verify(callback).failure(trackMessage, retrofitError);
+ verify(segmentService).upload(null, batch);
+ verify(callback).failure(eq(trackMessage), any(RuntimeException.class));
}
@Test
@@ -285,29 +481,489 @@ public void givesUpAfterMaxRetries() {
AnalyticsClient client = newClient();
TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
Batch batch = batchFor(trackMessage);
- RetrofitError retrofitError = RetrofitError.networkError(null, new IOException());
- when(segmentService.upload(batch)).thenThrow(retrofitError);
- BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
+ when(segmentService.upload(null, batch))
+ .thenAnswer(
+ new Answer>() {
+ public Call answer(InvocationOnMock invocation) {
+ Response failResponse =
+ Response.error(429, ResponseBody.create(null, "Not Found"));
+ return Calls.response(failResponse);
+ }
+ });
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10);
+ batchUploadTask.run();
+
+ // DEFAULT_RETRIES == maxRetries
+ // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java
+ verify(segmentService, times(11)).upload(null, batch);
+ verify(callback)
+ .failure(
+ eq(trackMessage),
+ argThat(
+ new ArgumentMatcher() {
+ @Override
+ public boolean matches(IOException exception) {
+ return exception.getMessage().equals("11 retries exhausted");
+ }
+ }));
+ }
+
+ @Test
+ public void hasDefaultRetriesSetTo3() {
+ AnalyticsClient client = newClient();
+ TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
+ Batch batch = batchFor(trackMessage);
+
+ when(segmentService.upload(null, batch))
+ .thenAnswer(
+ new Answer>() {
+ public Call answer(InvocationOnMock invocation) {
+ Response failResponse =
+ Response.error(429, ResponseBody.create(null, "Not Found"));
+ return Calls.response(failResponse);
+ }
+ });
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 3);
batchUploadTask.run();
- // 50 == MAX_ATTEMPTS in AnalyticsClient.java
- verify(segmentService, times(50)).upload(batch);
+ // DEFAULT_RETRIES == maxRetries
+ // tries 11(one normal run + 10 retries)
+ verify(segmentService, times(4)).upload(null, batch);
verify(callback)
.failure(
eq(trackMessage),
argThat(
- new TypeSafeMatcher() {
+ new ArgumentMatcher() {
@Override
- public void describeTo(Description description) {
- description.appendText("expected IOException");
+ public boolean matches(IOException exception) {
+ return exception.getMessage().equals("4 retries exhausted");
}
+ }));
+ }
+
+ @Test
+ public void flushWhenNotShutDown() throws InterruptedException {
+ AnalyticsClient client = newClient();
+
+ client.flush();
+ verify(messageQueue).put(POISON);
+ }
+
+ @Test
+ public void flushWhenShutDown() throws InterruptedException {
+ AnalyticsClient client = newClient();
+ isShutDown.set(true);
+
+ client.flush();
+
+ verify(messageQueue, times(0)).put(any(Message.class));
+ }
+
+ @Test
+ public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder)
+ throws InterruptedException {
+ AnalyticsClient client = newClient();
+
+ final Message message = builder.get().userId("foo").build();
+ client.enqueue(message);
+
+ verify(messageQueue).put(message);
+ }
+
+ @Test
+ public void enqueueWithRegularMessageWhenShutdown(MessageBuilderTest builder)
+ throws InterruptedException {
+ AnalyticsClient client = newClient();
+ isShutDown.set(true);
+
+ client.enqueue(builder.get().userId("foo").build());
+
+ verify(messageQueue, times(0)).put(any(Message.class));
+ }
+
+ @Test
+ public void enqueueWithStopMessageWhenShutdown() throws InterruptedException {
+ AnalyticsClient client = newClient();
+ isShutDown.set(true);
+
+ client.enqueue(STOP);
+
+ verify(messageQueue).put(STOP);
+ }
+
+ @Test
+ public void shutdownWhenAlreadyShutDown() throws InterruptedException {
+ AnalyticsClient client = newClient();
+ isShutDown.set(true);
+
+ client.shutdown();
+
+ verify(messageQueue, times(0)).put(any(Message.class));
+ verifyNoInteractions(networkExecutor, callback, segmentService);
+ }
+
+ @Test
+ public void shutdownWithNoMessageInTheQueue() throws InterruptedException {
+ AnalyticsClient client = newClient();
+ client.shutdown();
+
+ verify(messageQueue).put(STOP);
+ verify(networkExecutor).shutdown();
+ verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
+ verifyNoMoreInteractions(networkExecutor);
+ }
+
+ @Test
+ public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder)
+ throws InterruptedException {
+ AnalyticsClient client = newClient();
+
+ client.enqueue(builder.get().userId("foo").build());
+ client.shutdown();
+
+ verify(messageQueue).put(STOP);
+ verify(networkExecutor).shutdown();
+ verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
+ verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class));
+ }
+
+ @Test
+ public void neverRetries() {
+ AnalyticsClient client = newClient();
+ TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
+ Batch batch = batchFor(trackMessage);
+ when(segmentService.upload(null, batch))
+ .thenAnswer(
+ new Answer>() {
+ public Call answer(InvocationOnMock invocation) {
+ Response failResponse =
+ Response.error(429, ResponseBody.create(null, "Not Found"));
+ return Calls.response(failResponse);
+ }
+ });
+
+ BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0);
+ batchUploadTask.run();
+
+ // runs once but never retries
+ verify(segmentService, times(1)).upload(null, batch);
+ verify(callback)
+ .failure(
+ eq(trackMessage),
+ argThat(
+ new ArgumentMatcher() {
@Override
- protected boolean matchesSafely(Throwable item) {
- IOException exception = (IOException) item;
- return exception.getMessage().equals("50 retries exhausted");
+ public boolean matches(IOException exception) {
+ return exception.getMessage().equals("1 retries exhausted");
}
}));
}
+
+ /**
+ * **********************************************************************************************
+ * Test cases for Size check
+ * *********************************************************************************************
+ */
+
+ /** Individual Size check happy path regular chars */
+ @Test
+ public void checkForIndividualMessageSizeLessThanLimit() {
+ AnalyticsClient client = newClient();
+ int msgSize = 1024 * 31; // 31KB
+ int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768
+ Map properties = new HashMap();
+
+ properties.put("property1", generateDataOfSize(msgSize));
+
+ TrackMessage bigMessage =
+ TrackMessage.builder("Event").userId("jorgen25").properties(properties).build();
+ client.enqueue(bigMessage);
+
+ int msgActualSize = client.messageSizeInBytes(bigMessage);
+ assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit);
+ }
+
+ /** Individual Size check sad path regular chars (over the limit) */
+ @Test
+ public void checkForIndividualMessageSizeOverLimit() throws IllegalArgumentException {
+ AnalyticsClient client = newClient();
+ int msgSize = MAX_MSG_SIZE + 1; // BARELY over the limit
+ int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768
+ Map properties = new HashMap();
+
+ properties.put("property1", generateDataOfSize(msgSize));
+
+ TrackMessage bigMessage =
+ TrackMessage.builder("Event").userId("jorgen25").properties(properties).build();
+ try {
+ client.enqueue(bigMessage);
+ } catch (IllegalArgumentException e) {
+ assertThat(e).isExactlyInstanceOf(e.getClass());
+ }
+
+ int msgActualSize = client.messageSizeInBytes(bigMessage);
+ assertThat(msgActualSize).isGreaterThan(sizeLimit);
+ }
+
+ /** Individual Size check happy path special chars */
+ @Test
+ public void checkForIndividualMessageSizeSpecialCharsLessThanLimit() {
+ AnalyticsClient client = newClient();
+ int msgSize = MAX_MSG_SIZE; // 32KB
+ int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768
+
+ Map properties = new HashMap();
+ properties.put("property1", generateDataOfSizeSpecialChars(msgSize, true));
+
+ TrackMessage bigMessage =
+ TrackMessage.builder("Event").userId("jorgen25").properties(properties).build();
+ client.enqueue(bigMessage);
+
+ int msgActualSize = client.messageSizeInBytes(bigMessage);
+ assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit);
+ }
+
+ /** Individual Size check sad path special chars (over the limit) */
+ @Test
+ public void checkForIndividualMessageSizeSpecialCharsAboveLimit() {
+ AnalyticsClient client = newClient();
+ int msgSize = MAX_MSG_SIZE; // 32KB
+ int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768
+ Map properties = new HashMap();
+
+ properties.put("property1", generateDataOfSizeSpecialChars(msgSize, false));
+
+ TrackMessage bigMessage =
+ TrackMessage.builder("Event").userId("jorgen25").properties(properties).build();
+
+ try {
+ client.enqueue(bigMessage);
+ } catch (IllegalArgumentException e) {
+ assertThat(e).isExactlyInstanceOf(e.getClass());
+ }
+
+ int msgActualSize = client.messageSizeInBytes(bigMessage);
+ assertThat(msgActualSize).isGreaterThan(sizeLimit);
+ }
+
+ /**
+ * *****************************************************************************************************************
+ * Test cases for enqueue modified logic
+ * ***************************************************************************************************************
+ */
+ @Test
+ public void enqueueVerifyPoisonIsNotCheckedForSize() throws InterruptedException {
+ AnalyticsClient clientSpy = spy(newClient());
+
+ clientSpy.enqueue(POISON);
+ verify(messageQueue).put(POISON);
+ verify(clientSpy, never()).messageSizeInBytes(POISON);
+ }
+
+ @Test
+ public void enqueueVerifyStopIsNotCheckedForSize() throws InterruptedException {
+ AnalyticsClient clientSpy = spy(newClient());
+
+ clientSpy.enqueue(STOP);
+ verify(messageQueue).put(STOP);
+ verify(clientSpy, never()).messageSizeInBytes(STOP);
+ }
+
+ @Test
+ public void enqueueVerifyRegularMessageIsEnqueuedAndCheckedForSize(MessageBuilderTest builder)
+ throws InterruptedException {
+ AnalyticsClient clientSpy = spy(newClient());
+
+ Message message = builder.get().userId("jorgen25").build();
+ clientSpy.enqueue(message);
+ verify(messageQueue).put(message);
+ verify(clientSpy, times(1)).messageSizeInBytes(message);
+ }
+
+ /**
+ * This test case was to prove the limit in batch is not being respected so will probably delete
+ * it later NOTE: Used to be a test case created to prove huge messages above the limit are still
+ * being submitted in batch modified it to prove they are not anymore after changing logic in
+ * analyticsClient
+ *
+ * @param builder
+ * @throws InterruptedException
+ */
+ @Test
+ public void enqueueSingleMessageAboveLimitWhenNotShutdown(MessageBuilderTest builder)
+ throws InterruptedException, IllegalArgumentException {
+ AnalyticsClient client = newClient();
+
+ // Message is above batch limit
+ final String massData = generateDataOfSizeSpecialChars(MAX_MSG_SIZE, false);
+ Map integrationOpts = new HashMap<>();
+ integrationOpts.put("massData", massData);
+ Message message =
+ builder.get().userId("foo").integrationOptions("someKey", integrationOpts).build();
+
+ try {
+ client.enqueue(message);
+ } catch (IllegalArgumentException e) {
+ assertThat(e).isExactlyInstanceOf(e.getClass());
+ }
+
+ wait(messageQueue);
+
+ // Message is above MSG/BATCH size limit so it should not be put in queue
+ verify(messageQueue, never()).put(message);
+ // And since it was never in the queue, it was never submitted in batch
+ verify(networkExecutor, never()).submit(any(AnalyticsClient.BatchUploadTask.class));
+ }
+
+ @Test
+ public void enqueueVerifyRegularMessagesSpecialCharactersBelowLimit(MessageBuilderTest builder)
+ throws InterruptedException, IllegalArgumentException {
+ AnalyticsClient client = newClient();
+ int msgSize = 1024 * 18; // 18KB
+
+ for (int i = 0; i < 2; i++) {
+ final String data = generateDataOfSizeSpecialChars(msgSize, true);
+ Map integrationOpts = new HashMap<>();
+ integrationOpts.put("data", data);
+ Message message =
+ builder.get().userId("jorgen25").integrationOptions("someKey", integrationOpts).build();
+ client.enqueue(message);
+ verify(messageQueue).put(message);
+ }
+ client.enqueue(POISON);
+ verify(messageQueue).put(POISON);
+
+ wait(messageQueue);
+ client.shutdown();
+ while (!isShutDown.get()) {}
+
+ verify(networkExecutor, times(1)).submit(any(AnalyticsClient.BatchUploadTask.class));
+ }
+
+ /**
+ * ******************************************************************************************************************
+ * Test cases for Batch creation logic
+ * ****************************************************************************************************************
+ */
+
+ /**
+ * Several messages are enqueued and then submitted in a batch
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgumentException {
+ AnalyticsClient client =
+ new AnalyticsClient(
+ messageQueue,
+ null,
+ segmentService,
+ 50,
+ TimeUnit.HOURS.toMillis(1),
+ 0,
+ MAX_BATCH_SIZE * 4,
+ log,
+ threadFactory,
+ networkExecutor,
+ Collections.singletonList(callback),
+ isShutDown,
+ writeKey,
+ new Gson());
+
+ Map properties = new HashMap();
+ properties.put("property3", generateDataOfSizeSpecialChars(((int) (MAX_MSG_SIZE * 0.9)), true));
+
+ for (int i = 0; i < 15; i++) {
+ TrackMessage bigMessage =
+ TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build();
+ client.enqueue(bigMessage);
+ verify(messageQueue).put(bigMessage);
+ }
+ client.enqueue(POISON);
+ wait(messageQueue);
+
+ client.shutdown();
+ while (!isShutDown.get()) {}
+ verify(networkExecutor, times(1)).submit(any(Runnable.class));
+ }
+
+ /**
+ * Enqueued several messages above threshold of 500Kbs so queue gets backpressured at some point
+ * and several batches have to be created to not violate threshold
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgumentException {
+ AnalyticsClient client =
+ new AnalyticsClient(
+ messageQueue,
+ null,
+ segmentService,
+ 50,
+ TimeUnit.HOURS.toMillis(1),
+ 0,
+ MAX_BATCH_SIZE * 4,
+ log,
+ threadFactory,
+ networkExecutor,
+ Collections.singletonList(callback),
+ isShutDown,
+ writeKey,
+ new Gson());
+
+ Map properties = new HashMap();
+ properties.put("property3", generateDataOfSizeSpecialChars(MAX_MSG_SIZE, true));
+
+ for (int i = 0; i < 100; i++) {
+ TrackMessage message =
+ TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build();
+ client.enqueue(message);
+ verify(messageQueue).put(message);
+ }
+ wait(messageQueue);
+ client.shutdown();
+ while (!isShutDown.get()) {}
+
+ verify(networkExecutor, times(8)).submit(any(Runnable.class));
+ }
+
+ @Test
+ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedException {
+ AnalyticsClient client =
+ new AnalyticsClient(
+ messageQueue,
+ null,
+ segmentService,
+ 50,
+ TimeUnit.HOURS.toMillis(1),
+ 0,
+ MAX_BATCH_SIZE * 4,
+ log,
+ threadFactory,
+ networkExecutor,
+ Collections.singletonList(callback),
+ isShutDown,
+ writeKey,
+ new Gson());
+
+ Map properties = new HashMap();
+ properties.put("property3", generateDataOfSizeSpecialChars(1024 * 8, true));
+
+ for (int i = 0; i < 600; i++) {
+ TrackMessage message =
+ TrackMessage.builder("Event").userId("jorgen25").properties(properties).build();
+ client.enqueue(message);
+ verify(messageQueue).put(message);
+ }
+ wait(messageQueue);
+ client.shutdown();
+ while (!isShutDown.get()) {}
+
+ verify(networkExecutor, times(21)).submit(any(Runnable.class));
+ }
}
diff --git a/e2e-cli/README.md b/e2e-cli/README.md
new file mode 100644
index 00000000..b319749e
--- /dev/null
+++ b/e2e-cli/README.md
@@ -0,0 +1,54 @@
+# analytics-java e2e-cli
+
+E2E test CLI for the [analytics-java](https://github.com/segmentio/analytics-java) SDK. Accepts a JSON input describing events and SDK configuration, sends them through the real SDK, and outputs results as JSON.
+
+Built with Kotlin (JVM) and packaged as a fat jar via Maven.
+
+## Setup
+
+```bash
+mvn package -pl e2e-cli -am
+```
+
+## Usage
+
+```bash
+java -jar e2e-cli/target/e2e-cli-*-jar-with-dependencies.jar --input '{"writeKey":"...", ...}'
+```
+
+## Input Format
+
+```jsonc
+{
+ "writeKey": "your-write-key", // required
+ "apiHost": "https://...", // optional โ SDK default if omitted
+ "sequences": [ // required โ event sequences to send
+ {
+ "delayMs": 0,
+ "events": [
+ { "type": "track", "event": "Test", "userId": "user-1" }
+ ]
+ }
+ ],
+ "config": { // optional
+ "flushAt": 250,
+ "flushInterval": 10000,
+ "maxRetries": 3,
+ "timeout": 15
+ }
+}
+```
+
+Note: Java is a server-side SDK โ there is no CDN settings fetch, so `cdnHost` does not apply.
+
+## Output Format
+
+```json
+{ "success": true, "sentBatches": 1 }
+```
+
+On failure:
+
+```json
+{ "success": false, "error": "description", "sentBatches": 0 }
+```
diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json
new file mode 100644
index 00000000..db3d0167
--- /dev/null
+++ b/e2e-cli/e2e-config.json
@@ -0,0 +1,7 @@
+{
+ "sdk": "java",
+ "test_suites": "basic",
+ "auto_settings": false,
+ "patch": null,
+ "env": {}
+}
diff --git a/e2e-cli/pom.xml b/e2e-cli/pom.xml
new file mode 100644
index 00000000..b5782b8f
--- /dev/null
+++ b/e2e-cli/pom.xml
@@ -0,0 +1,91 @@
+
+
+
+ 4.0.0
+
+
+ analytics-parent
+ com.segment.analytics.java
+ 3.5.5-SNAPSHOT
+
+
+ com.segment.analytics.java
+ e2e-cli
+ 3.5.5-SNAPSHOT
+ Analytics Java E2E CLI
+
+ E2E testing CLI for Segment Analytics for Java.
+
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ ${kotlin.version}
+
+
+ com.segment.analytics.java
+ analytics
+ ${project.version}
+
+
+ org.jetbrains.kotlinx
+ kotlinx-serialization-json
+ 1.4.1
+
+
+
+
+ src/main/kotlin
+
+
+
+ kotlin-maven-plugin
+ org.jetbrains.kotlin
+ ${kotlin.version}
+
+
+ kotlinx-serialization
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-maven-serialization
+ ${kotlin.version}
+
+
+
+
+ compile
+ compile
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+ cli.MainKt
+
+
+
+
+
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/e2e-cli/run-e2e.sh b/e2e-cli/run-e2e.sh
new file mode 100755
index 00000000..782df0e8
--- /dev/null
+++ b/e2e-cli/run-e2e.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Run E2E tests for analytics-java
+#
+# Prerequisites: Java 11+, Maven, Node.js 18+
+#
+# Usage:
+# ./run-e2e.sh [extra args passed to run-tests.sh]
+#
+# Override sdk-e2e-tests location:
+# E2E_TESTS_DIR=../my-e2e-tests ./run-e2e.sh
+#
+
+set -e
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+SDK_ROOT="$SCRIPT_DIR/.."
+E2E_DIR="${E2E_TESTS_DIR:-$SDK_ROOT/../sdk-e2e-tests}"
+
+echo "=== Building analytics-java e2e-cli ==="
+
+# Build SDK and e2e-cli
+cd "$SDK_ROOT"
+mvn package -pl e2e-cli -am -DskipTests
+
+# Find the built jar
+CLI_JAR=$(find "$SDK_ROOT/e2e-cli/target" -name "e2e-cli-*-jar-with-dependencies.jar" | head -1)
+if [[ -z "$CLI_JAR" ]]; then
+ echo "Error: Could not find e2e-cli jar"
+ exit 1
+fi
+echo "Found jar: $CLI_JAR"
+
+echo ""
+
+# Run tests
+cd "$E2E_DIR"
+./scripts/run-tests.sh \
+ --sdk-dir "$SCRIPT_DIR" \
+ --cli "java -jar $CLI_JAR" \
+ "$@"
diff --git a/e2e-cli/src/main/kotlin/cli/Main.kt b/e2e-cli/src/main/kotlin/cli/Main.kt
new file mode 100644
index 00000000..b4577615
--- /dev/null
+++ b/e2e-cli/src/main/kotlin/cli/Main.kt
@@ -0,0 +1,189 @@
+package cli
+
+import com.google.gson.Gson
+import com.google.gson.reflect.TypeToken
+import com.segment.analytics.Analytics
+import com.segment.analytics.Callback
+import com.segment.analytics.messages.*
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+data class CLIOutput(
+ val success: Boolean,
+ val error: String? = null,
+ val sentBatches: Int = 0
+)
+
+data class CLIConfig(
+ val flushAt: Int? = null,
+ val flushInterval: Long? = null,
+ val maxRetries: Int? = null,
+ val timeout: Int? = null
+)
+
+data class EventSequence(
+ val delayMs: Long = 0,
+ val events: List