Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Instrument Netty's EventLoop for deterministic execution of scheduled tasks

Tags:

java

netty

I'd like to instrument Netty's EventLoop in order to:

  • Run tasks in a deterministic order.
  • Grab the deadline of scheduled tasks.
  • Fast-forward a virtual clock, triggering execution of deadlined tasks.

I know about EmbeddedChannel and I use it in some tests. But what I want is something between unit testing, and integration testing that remains blind about some corner cases. Deconnection-reconnection and ping scenarios use scheduled tasks a lot. I could add determinism with huge delays, but I don't want my tests to wait for seconds or more. So instrumenting Netty's EventLoop looks like the solution.

I've already written code that makes sense at least to me.

  • I modify ScheduledFutureTask#nanoTime to return a value of mine.
  • I derive NioEventLoopGroup so I grab tasks deadlines.
  • I alter the value returned by ScheduledFutureTask#nanoTime.

What is nice is that Netty code only relies on the value returned by ScheduledFutureTask#nanoTime (great design!) so it is a very limited change. I use ByteBuddy to avoid copy-pasting Netty code, but this is not important.

A very simple test like InstrumentedNioEventLoopGroupTest fails when scheduling only 1 task because AbstractScheduledEventExecutor#pollScheduledTask(long) has a null queue.

I discovered that each NioEventLoop has its own task queue, and queue polling might not happen because NioEventLoopGroup waits for a Selector to signal something, which makes sense. So I increased the thread count of the NioEventLoopGroup to 2. I also tried to set ioRatio to 1 and schedule more tasks, with no better results. Using the debugger, it seems that my tasks always "falls" in the task queue that is not polled.

Any idea to make this work? I'm using Netty 4.1.24.Final.

ScheduledFutureTaskHack.java

package com.otcdlink.chiron.integration.harness;

import com.otcdlink.chiron.toolbox.ToStringTools;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.dynamic.loading.ClassReloadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

/**
 *
 * Got the delegation working with the help of
 * https://www.infoq.com/articles/Easily-Create-Java-Agents-with-ByteBuddy
 */
final class ScheduledFutureTaskHack {

  private static final Logger LOGGER = LoggerFactory.getLogger( ScheduledFutureTaskHack.class ) ;

  private static final Class< ? > SCHEDULEDFUTURETASK_CLASS ;
  private static final Method SCHEDULEDFUTURETASK_NANOTIME_METHOD ;
  private static final Method SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD ;
  private static final Field SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD ;
  private static final Field SCHEDULEDFUTURETASK_STARTTIME_FIELD ;
  static {
    try {
      SCHEDULEDFUTURETASK_CLASS = Class.forName( "io.netty.util.concurrent.ScheduledFutureTask" ) ;
      SCHEDULEDFUTURETASK_NANOTIME_METHOD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredMethod( "nanoTime" ) ;
      SCHEDULEDFUTURETASK_NANOTIME_METHOD.setAccessible( true ) ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredMethod( "deadlineNanos") ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.setAccessible( true ) ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredField( "deadlineNanos" ) ;
      SCHEDULEDFUTURETASK_DEADLINENANOS_FIELD.setAccessible( true ) ;
      SCHEDULEDFUTURETASK_STARTTIME_FIELD =
          SCHEDULEDFUTURETASK_CLASS.getDeclaredField( "START_TIME" ) ;
      SCHEDULEDFUTURETASK_STARTTIME_FIELD.setAccessible( true ) ;
    } catch( ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e ) {
      throw new Error( e ) ;
    }
  }

  /**
   * Everything is this class must be visible from the redefined class.
   */
  @SuppressWarnings( "unused" )
  public static final class StaticMethodDelegate {
    /**
     * Calls to {@link io.netty.util.concurrent.ScheduledFutureTask#nanoTime()} are redirected
     * to this method.
     * Sadly we can't use parameter annotated with {@link @This} or something giving a hint
     * about the call context. It looks like a consequence of JVMTI reload not supporting method
     * addition (adding a parameter would imply creating a new method).
     */
    public static long nanoTime() {
      final long supplied = longSupplier.getAsLong() ;
      LOGGER.debug( "Called " + StaticMethodDelegate.class.getSimpleName() + "#nanoTime(), " +
          "returns " + supplied + "." ) ;
      return supplied ;
    }

  }

  private static LongSupplier longSupplier = null ;

  static void install( final LongSupplier longSupplier ) {
    install( longSupplier, true ) ;
  }

  /**
   *
   * @param longSupplier
   * @param suppliedNanosRelativeToClassloadingTime if {@code true}, supplied nanoseconds are
   *     relative to {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME}.
   *     Original behavior of the hacked method is to substract
   *     {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME} from value returned
   *     by {@link System#nanoTime()} (probably to make number more readable and reduce the risk
   *     of an overflow). During tests we prefer to not care about start time so there is this
   *     option to add it automatically.
   */
  static void install(
      final LongSupplier longSupplier,
      final boolean suppliedNanosRelativeToClassloadingTime
  ) {
    checkState( ScheduledFutureTaskHack.longSupplier == null ) ;
    if( suppliedNanosRelativeToClassloadingTime ) {
      final long startTime = START_TIME ;
      LOGGER.debug(
          "Installing with value of " +
          SCHEDULEDFUTURETASK_STARTTIME_FIELD.toGenericString() +
          " = " + startTime + " automatically added to the values supplied."
      ) ;
      class AdjustedLongSupplier implements LongSupplier {
        @Override
        public long getAsLong() {
          return longSupplier.getAsLong() + startTime ;
        }
        @Override
        public String toString() {
          return ToStringTools.getNiceClassName( this ) + "{startTime=" + startTime + "}" ;
        }
      }
      ScheduledFutureTaskHack.longSupplier = new AdjustedLongSupplier() ;
    } else {
      ScheduledFutureTaskHack.longSupplier = checkNotNull( longSupplier ) ;
    }
    ByteBuddyAgent.install() ;
    LOGGER.info( "Successfully installed ByteBuddy Agent." ) ;
    redefineClass() ;
    LOGGER.info( "Successfully redefined static method implementation." ) ;
  }

  private static void redefineClass() {
    new ByteBuddy()
        .redefine( SCHEDULEDFUTURETASK_CLASS )
        .method( named( "nanoTime" )
            .and( isStatic() )
            .and( isPackagePrivate() )
            .and( takesArguments( 0 ) )
            .and( returns( long.class ) )
        )
        .intercept( MethodDelegation.to( StaticMethodDelegate.class ) )
        .make()
        .load( ScheduledFutureTaskHack.class.getClassLoader(), ClassReloadingStrategy.fromInstalledAgent() )
    ;
  }

  /**
   * Invokes method replacing {@link io.netty.util.concurrent.ScheduledFutureTask#nanoTime()}.
   */
  public static long invokeNanoTime() {
    try {
      return ( long ) SCHEDULEDFUTURETASK_NANOTIME_METHOD.invoke( null ) ;
    } catch( IllegalAccessException | InvocationTargetException e ) {
      throw new Error( e ) ;
    }
  }

  /**
   * The {@link io.netty.util.concurrent.ScheduledFutureTask#deadlineNanos()} method returns
   * the value made from {@link System#nanoTime()},
   * minus {@link io.netty.util.concurrent.ScheduledFutureTask#START_TIME},
   * plus the delay before executing the task.
   */
  public static Long invokeDeadlineNanos( final Future future ) {
    try {
      if( SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.getDeclaringClass()
          .isAssignableFrom( future.getClass() )
      ) {
        return ( long ) SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD.invoke( future ) ;
      } else {
        return null ;
      }
    } catch( IllegalAccessException | InvocationTargetException e ) {
      throw new Error(
          "Could not access method " + SCHEDULEDFUTURETASK_DEADLINENANOS_METHOD + " in " + future,
          e
      ) ;
    }
  }

  private static long readStartTime() {
    try {
      return ( long ) SCHEDULEDFUTURETASK_STARTTIME_FIELD.get( null ) ;
    } catch( IllegalAccessException e ) {
      throw new Error(
          "Could not access static field " + SCHEDULEDFUTURETASK_STARTTIME_FIELD,
          e
      ) ;
    }
  }

  public static final long START_TIME = readStartTime() ;


}

ScheduledFutureTaskHackTest.java

package com.otcdlink.chiron.integration.harness;

import com.otcdlink.chiron.toolbox.ToStringTools;
import com.otcdlink.chiron.toolbox.netty.NettyTools;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

public class ScheduledFutureTaskHackTest {

  @Test
  public void fastForward() throws InterruptedException {

    final AtomicLong nanotimeHolder = new AtomicLong( 0 ) ;
    ScheduledFutureTaskHack.install( nanotimeHolder::get ) ;
    final long startTime = hackedNanoTime() ;

    final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup() ;
    final Semaphore scheduledTaskCompleted = new Semaphore( 0 ) ;
    nioEventLoopGroup.schedule(
        () -> {
          scheduledTaskCompleted.release() ;
          LOGGER.info( "Scheduled task completed." ) ;
        },
        1,
        TimeUnit.HOURS
    ) ;
    LOGGER.info( "Scheduled task for in 1 hour, now fast-forwarding Netty's clock ..." ) ;

    // Test fails when disabling fast-forward below.
    nanotimeHolder.set( startTime + TimeUnit.HOURS.toNanos( 1 ) + 1 ) ;
    Thread.sleep( 1000 ) ;
    hackedNanoTime() ;
    // Amazingly Netty detected clock change and ran the task!
    assertThat( scheduledTaskCompleted.tryAcquire( 1, TimeUnit.SECONDS ) )
        .describedAs( "Scheduled task should have completed within 1 second" )
        .isTrue()
    ;

  }


// =======
// Fixture
// =======

  private static final Logger LOGGER = LoggerFactory.getLogger(
      ScheduledFutureTaskHackTest.class ) ;

  static {
    NettyTools.forceNettyClassesToLoad() ;
  }

  private static long hackedNanoTime() {
    final long nanoTime = ScheduledFutureTaskHack.invokeNanoTime() ;
    LOGGER.info(
        ToStringTools.getNiceName( ScheduledFutureTaskHack.StaticMethodDelegate.class ) +
        "#nanoTime(): " + nanoTime + "."
    ) ;
    return nanoTime ;
  }

}

InstrumentedNioEventLoopGroup.java

package com.otcdlink.chiron.integration.harness;

import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;

import javax.annotation.Nonnull;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkNotNull;

class InstrumentedNioEventLoopGroup extends NioEventLoopGroup {

  /**
   * Consume the value obtained from
   * {@link io.netty.util.concurrent.ScheduledFutureTask#deadlineNanos()}.
   * This is hardly mappable to an exact {@link Instant} (even if the Java flavor retains
   * nanoseconds) but this is enough to compare with {@link System#nanoTime()}.
   */
  private final Consumer< Long > scheduledTaskMomentConsumer ;

  public InstrumentedNioEventLoopGroup(
      final ThreadFactory threadFactory,
      final Consumer< Long > scheduledTaskMomentConsumer
  ) {
    // Need 2 threads because one will block on Socket Selector if there is no IO,
    // so we add one to poll Tasks.
    super( 2, threadFactory ) ;
    this.scheduledTaskMomentConsumer = checkNotNull( scheduledTaskMomentConsumer ) ;
  }

  private < FUTURE extends Future > FUTURE recordDeadlineNanos( final FUTURE future ) {
    final Long deadlineNanos = ScheduledFutureTaskHack.invokeDeadlineNanos( future ) ;
    if( deadlineNanos != null ) {
      scheduledTaskMomentConsumer.accept( deadlineNanos ) ;
    }
    return future ;
  }


  @Nonnull
  @Override
  public Future< ? > submit( final Runnable task ) {
    return recordDeadlineNanos( super.submit( task ) ) ;
  }

  @Nonnull
  @Override
  public < T > Future< T > submit(
      final Runnable task,
      final T result
  ) {
    return recordDeadlineNanos( super.submit( task, result ) ) ;
  }

  @Nonnull
  @Override
  public < T > Future< T > submit( final Callable< T > task ) {
    return recordDeadlineNanos( super.submit( task ) ) ;
  }

  @Nonnull
  @Override
  public ScheduledFuture< ? > schedule(
      final Runnable command,
      final long delay,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos( super.schedule( command, delay, unit ) ) ;
  }

  @Nonnull
  @Override
  public < V > ScheduledFuture< V > schedule(
      final Callable< V > callable,
      final long delay,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos( super.schedule( callable, delay, unit ) ) ;
  }

  @Nonnull
  @Override
  public ScheduledFuture< ? > scheduleAtFixedRate(
      final Runnable command,
      final long initialDelay,
      final long period,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos(
        super.scheduleAtFixedRate( command, initialDelay, period, unit ) ) ;
  }

  @Nonnull
  @Override
  public ScheduledFuture< ? > scheduleWithFixedDelay(
      final Runnable command,
      final long initialDelay,
      final long delay,
      final TimeUnit unit
  ) {
    return recordDeadlineNanos(
        super.scheduleWithFixedDelay( command, initialDelay, delay, unit ) ) ;
  }
}

InstrumentedNioEventLoopGroupTest.java

package com.otcdlink.chiron.integration.harness;

import com.otcdlink.chiron.toolbox.concurrent.ExecutorTools;
import com.otcdlink.chiron.toolbox.netty.NettyTools;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.ScheduledFuture;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

public class InstrumentedNioEventLoopGroupTest {

  @Test
  public void recordAndAdjust() throws InterruptedException {

    final int delay = 10 ;
    final TimeUnit timeUnit = TimeUnit.SECONDS ;

    final AtomicLong nanoInstantSupplier = new AtomicLong() ;
    ScheduledFutureTaskHack.install( nanoInstantSupplier::get ) ;

    final List< Long > taskDeadlineRecorder = Collections.synchronizedList( new ArrayList<>() ) ;
    final InstrumentedNioEventLoopGroup executor = new InstrumentedNioEventLoopGroup(
        ExecutorTools.newThreadFactory( "executor" ), taskDeadlineRecorder::add ) ;
    executor.setIoRatio( 1 ) ;  // Silly but worth trying to see what can get wrong.

    final Semaphore doneSemaphore = new Semaphore( 0 ) ;
    final ScheduledFuture< ? > scheduledFuture1 =
        executor.schedule( ( Runnable ) doneSemaphore::release, delay, timeUnit ) ;
    LOGGER.info( "Scheduled " + scheduledFuture1 + "." ) ;

    assertThat( taskDeadlineRecorder ).hasSize( 1 ) ;
    final Long nanoTime = taskDeadlineRecorder.get( 0 ) - ScheduledFutureTaskHack.START_TIME ;
    LOGGER.info( "Recorded " + nanoTime + " as nanoTime deadline for next task." ) ;

    assertThat( nanoTime ).isEqualTo( timeUnit.toNanos( delay ) ) ;
    final long pastDeadline = nanoTime + 1 ;
    nanoInstantSupplier.set( pastDeadline ) ;
    LOGGER.info(
        "Did set nanoTime to " + pastDeadline + ", past to Task's deadline. " +
        "Invocation of hacked nanoTime() returns " +
        ScheduledFutureTaskHack.invokeNanoTime() + "."
    ) ;
    LOGGER.info( "Now waiting for task completion ..." ) ;
    assertThat( doneSemaphore.tryAcquire( 3, TimeUnit.SECONDS ) ).isTrue() ;
  }

  /**
   * Fails when ran after {@link #recordAndAdjust()} because JUnit doesn't reload classes for
   * each method inside a test class.
   */
  @Test
  public void noInstrumentation() throws InterruptedException {
    final NioEventLoopGroup executor =
        new NioEventLoopGroup( 1, ExecutorTools.newThreadFactory( "executor" ) ) ;
    final Semaphore doneSemaphore = new Semaphore( 0 ) ;
    executor.submit( () -> LOGGER.info( "Plain submission works!" ) ) ;
    final ScheduledFuture< ? > scheduledFuture =
        executor.schedule( ( Runnable ) doneSemaphore::release, 1, TimeUnit.SECONDS ) ;
    LOGGER.info( "Scheduled " + scheduledFuture + "." ) ;
    assertThat( doneSemaphore.tryAcquire( 3, TimeUnit.SECONDS ) ).isTrue() ;
  }


// =======
// Fixture
// =======

  private static final Logger LOGGER =
      LoggerFactory.getLogger( InstrumentedNioEventLoopGroupTest.class ) ;

  static {
    NettyTools.forceNettyClassesToLoad() ;
  }

}

I'm the author of the Chiron Framework , a WebSocket-based networking framework with a pure Java client, and non-blocking 2-factor authentication. It uses Netty a lot. Sadly there are many JMockit-based tests which don't run reliably because the execution order may be non-deterministic (this is a problem with every piece of code that schedules tasks).

like image 276
Laurent Caillette Avatar asked Jun 12 '18 07:06

Laurent Caillette


1 Answers

Oh guys it was so obvious: I override every schedule* method, pile up the Runnable/Callable and other parameters in a data object, and add it to some queue. Then I trigger task execution explicitely from the test.

Since I the code that creates the tasks is mine, I decorate each task (which happens to be a method reference) with a tagging interface. Then the test can check it runs the expected task.

like image 78
Laurent Caillette Avatar answered Oct 24 '22 12:10

Laurent Caillette