From acfc583fd365a3cdb662dbfa6a609e6395ea4fa8 Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 6 May 2021 09:51:24 +0300 Subject: [PATCH 1/9] notebooks update v1 --- .../customizing_what_happens_in_train.ipynb | 426 +++++++++++------- 1 file changed, 253 insertions(+), 173 deletions(-) diff --git a/examples/notebooks/customizing_what_happens_in_train.ipynb b/examples/notebooks/customizing_what_happens_in_train.ipynb index d843d7a853..6f7a1211d9 100644 --- a/examples/notebooks/customizing_what_happens_in_train.ipynb +++ b/examples/notebooks/customizing_what_happens_in_train.ipynb @@ -24,7 +24,7 @@ "\n", "A core principle of Catalyst is **progressive disclosure of complexity**. You should always be able to get into lower-level workflows in a gradual way. You shouldn't fall off a cliff if the high-level functionality doesn't exactly match your use case. You should be able to gain more control over the small details while retaing a commensurate amount of high-level convenience. \n", "\n", - "When you need to customize what `train()` does, you should **override the `_handle_batch` function of the `Runner` class**. This is the function that is called by `train()` for every batch of data. You will then be able to call `train()` as usual -- and it will be running your own learning algorithm.\n", + "When you need to customize what `train()` does, you should **override the `handle_batch` function of the `Runner` class**. This is the function that is called by `train()` for every batch of data. You will then be able to call `train()` as usual -- and it will be running your own learning algorithm.\n", "\n", "Note that this pattern does not prevent you from building models with the Functional API. You can do this with **any** PyTorch model.\n", "\n", @@ -66,7 +66,7 @@ }, "outputs": [], "source": [ - "!pip install catalyst==20.10.1\n", + "!pip install catalyst[ml]==21.4.2\n", "# don't forget to restart runtime for correct `PIL` work with Colab" ] }, @@ -96,7 +96,7 @@ "outputs": [], "source": [ "import catalyst\n", - "from catalyst import dl, utils\n", + "from catalyst import dl, metrics, utils\n", "catalyst.__version__" ] }, @@ -112,12 +112,14 @@ "Let's start from a simple example:\n", "\n", "- We create a new runner that subclasses `dl.Runner`.\n", - "- We just override the method `_handle_batch(self, batch)`.\n", - "- We do our train step with any possible custom logic.\n", + "- We just override the `handle_batch(self, batch)` method for custom train step logic \n", + "- And update `on_loader_start`/`on_loader_start` handlers for correct custom metrics aggregation.\n", "\n", "The input argument `batch` is what gets passed to fit as training data. If you pass a `torch.utils.data.DataLoader`, by calling `train(loaders={\"train\": loader, \"valid\": loader}, ...)`, then `batch` will be what gets yielded by `loader` at each batch.\n", "\n", - "In the body of the `_handle_batch` method, we implement a regular training update, similar to what you are already familiar with. Importantly, **we log metrics via `self.batch_metrics`**, which passes them to the loggers." + "In the body of the `handle_batch` method, we implement a regular training update, similar to what you are already familiar with. Importantly, **we log batch-based metrics via `self.batch_metrics`**, which passes them to the loggers.\n", + "\n", + "Addiionally, we have to use [`AdditiveValueMetric`](https://catalyst-team.github.io/catalyst/api/metrics.html#additivevaluemetric) during `on_loader_start` and `on_loader_start` for correct metrics aggregation for the whole loader. Importantly, **we log loader-based metrics via `self.loader_metrics`**, which passes them to the loggers." ] }, { @@ -134,27 +136,41 @@ "from torch.nn import functional as F\n", "\n", "class CustomRunner(dl.Runner):\n", - "\n", - " def _handle_batch(self, batch):\n", - " # Unpack the data. Its structure depends on your model and\n", - " # on what you pass to `train()`.\n", - " x, y = batch\n", - "\n", - " y_pred = self.model(x) # Forward pass\n", - "\n", - " # Compute the loss value\n", - " loss = F.mse_loss(y_pred, y)\n", - "\n", - " # Update metrics (includes the metric that tracks the loss)\n", - " self.batch_metrics.update({\"loss\": loss, \"mae\": F.l1_loss(y_pred, y)})\n", - "\n", - " if self.is_train_loader:\n", - " # Compute gradients\n", - " loss.backward()\n", - " # Update weights\n", - " # (the optimizer is stored in `self.state`)\n", - " self.optimizer.step()\n", - " self.optimizer.zero_grad()" + " \n", + " def on_loader_start(self, runner):\n", + " super().on_loader_start(runner)\n", + " self.meters = {\n", + " key: metrics.AdditiveValueMetric(compute_on_call=False)\n", + " for key in [\"loss\", \"mae\"]\n", + " }\n", + "\n", + " def handle_batch(self, batch):\n", + " # Unpack the data. Its structure depends on your model and\n", + " # on what you pass to `train()`.\n", + " x, y = batch\n", + "\n", + " y_pred = self.model(x) # Forward pass\n", + "\n", + " # Compute the loss value\n", + " loss = F.mse_loss(y_pred, y)\n", + "\n", + " # Update metrics (includes the metric that tracks the loss)\n", + " self.batch_metrics.update({\"loss\": loss, \"mae\": F.l1_loss(y_pred, y)})\n", + " for key in [\"loss\", \"mae\"]:\n", + " self.meters[key].update(self.batch_metrics[key].item(), self.batch_size)\n", + "\n", + " if self.is_train_loader:\n", + " # Compute gradients\n", + " loss.backward()\n", + " # Update weights\n", + " # (the optimizer is stored in `self.state`)\n", + " self.optimizer.step()\n", + " self.optimizer.zero_grad()\n", + " \n", + " def on_loader_end(self, runner):\n", + " for key in [\"loss\", \"mae\"]:\n", + " self.loader_metrics[key] = self.meters[key].compute()[0]\n", + " super().on_loader_end(runner)" ] }, { @@ -208,7 +224,7 @@ "criterion = torch.nn.MSELoss()\n", "optimizer = torch.optim.Adam(model.parameters())\n", "\n", - "# Just use `train` as usual\n", + "# and use `train`\n", "runner = CustomRunner()\n", "runner.train(\n", " model=model, \n", @@ -229,7 +245,7 @@ "source": [ "## Going high-level\n", "\n", - "Naturally, you could skip a loss function backward in `_handle_batch()`, and instead do everything with `Callbacks` in `train` params. Likewise for metrics. Here's a high-level example, that only uses `_handle_batch()` for model forward pass and metrics computation:" + "Naturally, you could skip a loss function backward in `handle_batch()`, and instead do everything with `Callbacks` in `train` params. Likewise for metrics. Here's a high-level example, that only uses `handle_batch()` for model forward pass and metrics computation:" ] }, { @@ -245,20 +261,34 @@ "\n", "\n", "class CustomRunner(dl.Runner):\n", + " \n", + " def on_loader_start(self, runner):\n", + " super().on_loader_start(runner)\n", + " self.meters = {\n", + " key: metrics.AdditiveValueMetric(compute_on_call=False)\n", + " for key in [\"loss\", \"mae\"]\n", + " }\n", "\n", - " def _handle_batch(self, batch):\n", - " # Unpack the data. Its structure depends on your model and\n", - " # on what you pass to `train()`.\n", - " x, y = batch\n", + " def handle_batch(self, batch):\n", + " # Unpack the data. Its structure depends on your model and\n", + " # on what you pass to `train()`.\n", + " x, y = batch\n", "\n", - " y_pred = self.model(x) # Forward pass\n", + " y_pred = self.model(x) # Forward pass\n", "\n", - " # Compute the loss value\n", - " # (the criterion is stored in `self.state` also)\n", - " loss = self.criterion(y_pred, y)\n", + " # Compute the loss value\n", + " # (the criterion is stored in `self.state` also)\n", + " loss = self.criterion(y_pred, y)\n", "\n", - " # Update metrics (includes the metric that tracks the loss)\n", - " self.batch_metrics.update({\"loss\": loss, \"mae\": F.l1_loss(y_pred, y)})\n", + " # Update metrics (includes the metric that tracks the loss)\n", + " self.batch_metrics.update({\"loss\": loss, \"mae\": F.l1_loss(y_pred, y)})\n", + " for key in [\"loss\", \"mae\"]:\n", + " self.meters[key].update(self.batch_metrics[key].item(), self.batch_size)\n", + "\n", + " def on_loader_end(self, runner):\n", + " for key in [\"loss\", \"mae\"]:\n", + " self.loader_metrics[key] = self.meters[key].compute()[0]\n", + " super().on_loader_end(runner)\n", "\n", "\n", "# Construct custom data\n", @@ -289,8 +319,10 @@ " metric_key=\"loss\", # you can also pass 'mae' to optimize it instead\n", " # generaly, you can optimize any differentiable metric from `runner.batch_metrics`\n", " accumulation_steps=1, # also you can pass any number of steps for gradient accumulation\n", - " grad_clip_params=None, # or yor use `{\"func\": \"clip_grad_norm_\", max_norm=1, norm_type=2}`\n", - " # or `{\"func\": \"clip_grad_value_\", clip_value=1}`\n", + " grad_clip_fn=None, # or you can use `grad_clip_fn=nn.utils.clip_grad_norm_`\n", + " grad_clip_params=None, # with `grad_clip_params={max_norm=1, norm_type=2}`\n", + " # or `grad_clip_fn=nn.utils.clip_grad_value_`\n", + " # with `grad_clip_params={clip_value=1}`\n", " # for gradient clipping during training!\n", " # for more information about gradient clipping please follow pytorch docs\n", " # https://pytorch.org/docs/stable/nn.html#clip-grad-norm\n", @@ -311,8 +343,8 @@ "Let's go even deeper! Could we transfer different metrics/criterions computation to `Callbacks` too? Of course! If you want to support different losses, you'd simply do the following:\n", "\n", "- Do your model forward pass as usual.\n", - "- Save model input to `runner.input` and model output to `runner.output`, so Callbacks can find it.\n", - "- Add extra callbacks, that will use data from `runner.input` and `runner.output` for computation.\n", + "- Save all batch-based artefacts to `self.batch`, so Callbacks can find it.\n", + "- Add extra callbacks, that will use data from `runner.batch` during training.\n", "\n", "That's it. That's the list. Let's see the example:" ] @@ -331,18 +363,16 @@ "\n", "class CustomRunner(dl.Runner):\n", "\n", - " def _handle_batch(self, batch):\n", - " # Unpack the data. Its structure depends on your model and\n", - " # on what you pass to `train()`.\n", - " x, y = batch\n", + " def handle_batch(self, batch):\n", + " # Unpack the data. Its structure depends on your model and\n", + " # on what you pass to `train()`.\n", + " x, y = batch\n", "\n", - " y_pred = self.model(x) # Forward pass\n", - " \n", - " # pass network input to state `input`\n", - " self.batch = {\"features\": x, \"targets\": y}\n", - " # and network output to state `output`\n", - " # we recommend to use key-value storage to make it Callbacks-friendly\n", - " self.output = {\"logits\": y_pred}\n", + " y_pred = self.model(x) # Forward pass\n", + "\n", + " # pass all batch-based artefacts to `self.batch`\n", + " # we recommend to use key-value storage to make it Callbacks-friendly\n", + " self.batch = {\"features\": x, \"targets\": y, \"logits\": y_pred}\n", "\n", "\n", "# Construct custom data\n", @@ -369,20 +399,29 @@ " verbose=True,\n", " timeit=False,\n", " callbacks={\n", + " # alias for \n", + " # `runner.batch_metrics[metric_key] = \\\n", + " # runner.criterion[criterion_key](runner.batch[input_key], runner.batch[target_key])`\n", " \"criterion\": dl.CriterionCallback( # special Callback for criterion computation\n", - " input_key=\"targets\", # `input_key` specifies correct labels (or `y_true`) from `runner.input` \n", - " output_key=\"logits\", # `output_key` specifies model predictions (`y_pred`) from `runner.output`\n", - " prefix=\"loss\", # `prefix` - key to use with `runner.batch_metrics`\n", - " ), # alias for `runner.batch_metrics[prefix] = runner.criterion(runner.output[output_key], runner.input[input_key])`\n", - " \"metric\": dl.MetricCallback( # special Callback for metrics computation\n", - " input_key=\"targets\", # shares logic with `CriterionCallback`\n", - " output_key=\"logits\",\n", - " prefix=\"loss_mae\",\n", - " metric_fn=F.l1_loss, # metric function to use\n", - " ), # alias for `runner.batch_metrics[prefix] = metric_fn(runner.output[output_key], runner.input[input_key])`\n", + " input_key=\"logits\", # `input_key` specifies model predictions (`y_pred`) from `runner.batch`\n", + " target_key=\"targets\", # `target_key` specifies correct labels (or `y_true`) from `runner.batch` \n", + " metric_key=\"loss\", # `metric_key` - key to use with `runner.batch_metrics`\n", + " criterion_key=None, # `criterion_key` specifies criterion in case of key-value runner.criterion\n", + " # if `criterion_key=None`, runner.criterion used for computation\n", + " ), \n", + " # alias for \n", + " # `runner.batch_metrics[metric_key] = \\\n", + " # metric_fn(runner.batch[input_key], runner.batch[target_key])`\n", + " \"metric\": dl.FunctionalMetricCallback( # special Callback for metrics computation\n", + " input_key=\"logits\", # the same logic as with `CriterionCallback`\n", + " target_key=\"targets\", # the same logic as with `CriterionCallback`\n", + " metric_key=\"loss_mae\", # the same logic as with `CriterionCallback`\n", + " metric_fn=F.l1_loss, # metric function to use\n", + " ), \n", " \"optimizer\": dl.OptimizerCallback(\n", " metric_key=\"loss\", \n", " accumulation_steps=1,\n", + " grad_clip_fn=None,\n", " grad_clip_params=None,\n", " )\n", " }\n", @@ -396,7 +435,7 @@ "## Simplify it a bit - SupervisedRunner\n", "\n", "But can we simplify last example a bit?
\n", - "What if we know, that we are going to train `Supervised` model, that will take some `features` in and output some `logits` back?
\n", + "What if we know, that we are going to train `supervised` model, that will take some `features` in and output some `logits` back?
\n", "Looks like commom case... could we automate it? Let's check it out!" ] }, @@ -425,14 +464,16 @@ "\n", "# Just use `train` as usual\n", "runner = dl.SupervisedRunner( # `SupervisedRunner` works with any model like `some_output = model(some_input)`\n", - " input_key=\"features\", # if your dataloader yields (x, y) tuple, it will be transformed to \n", - " output_key=\"logits\", # {input_key: x, input_target_key: y} and stored to runner.input\n", - " input_target_key=\"targets\", # then the model will be used like\n", - ") # runner.output = model(runner.input[input_key])\n", - " # loss computation suppose to looks like\n", - " # loss = criterion(runner.output[input_target_key], runner.output[output_key])\n", - " # and stored to `runner.batch_metrics['loss']`\n", - "\n", + " input_key=\"features\", # if your dataloader yields (x, y) tuple, it will be transformed to \n", + " output_key=\"logits\", # {input_key: x, target_key: y} and stored to runner.batch\n", + " target_key=\"targets\", # then the model will be used like\n", + " loss_key=\"loss\", # runner.batch[runner.output_key] = model(runner.batch[input_key])\n", + ") # loss computation suppose to looks like\n", + " # loss = criterion(runner.batch[runner.output_key], runner.batch[runner.target_key])\n", + " # and stored to `runner.batch_metrics[runner.loss_key]`\n", + "\n", + "# thanks to prespecified `input_key`, `output_key`, `target_key` and `loss_key`\n", + "# `SupervisedRunner` automatically adds required `CriterionCallback` and `OptimizerCallback`\n", "runner.train(\n", " model=model, \n", " optimizer=optimizer,\n", @@ -443,22 +484,23 @@ " verbose=True,\n", " timeit=False,\n", " callbacks={\n", - " \"criterion_mse\": dl.CriterionCallback(\n", - " input_key=\"targets\",\n", - " output_key=\"logits\",\n", - " prefix=\"loss\",\n", - " ),\n", - " \"criterion_mae\": dl.MetricCallback(\n", - " input_key=\"targets\",\n", - " output_key=\"logits\",\n", - " prefix=\"mae\",\n", + "# \"criterion_mse\": dl.CriterionCallback(\n", + "# input_key=\"logits\",\n", + "# target_key=\"targets\",\n", + "# metric_key=\"loss\",\n", + "# ),\n", + " \"criterion_mae\": dl.FunctionalMetricCallback(\n", + " input_key=\"logits\",\n", + " target_key=\"targets\",\n", + " metric_key=\"mae\",\n", " metric_fn=F.l1_loss,\n", " ),\n", - " \"optimizer\": dl.OptimizerCallback(\n", - " metric_key=\"loss\", \n", - " accumulation_steps=1,\n", - " grad_clip_params=None,\n", - " )\n", + "# \"optimizer\": dl.OptimizerCallback(\n", + "# metric_key=\"loss\", \n", + "# accumulation_steps=1,\n", + "# grad_clip_fn=None,\n", + "# grad_clip_params=None,\n", + "# )\n", " }\n", ")" ] @@ -488,29 +530,44 @@ "\n", "class CustomRunner(dl.Runner):\n", " \n", - " def predict_batch(self, batch): # here is the trick\n", - " return self.model(batch[0].to(self.device)) # you can write any prediciton logic here\n", - "\n", - " def _handle_batch(self, batch): # our first time example\n", - " # Unpack the data. Its structure depends on your model and\n", - " # on what you pass to `train()`.\n", - " x, y = batch\n", - "\n", - " y_pred = self.model(x) # Forward pass\n", - "\n", - " # Compute the loss value\n", - " loss = F.mse_loss(y_pred, y)\n", - "\n", - " # Update metrics (includes the metric that tracks the loss)\n", - " self.batch_metrics.update({\"loss\": loss, \"mae\": F.l1_loss(y_pred, y)})\n", - "\n", - " if self.is_train_loader:\n", - " # Compute gradients\n", - " loss.backward()\n", - " # Update weights\n", - " # (the optimizer is stored in `self.state`)\n", - " self.optimizer.step()\n", - " self.optimizer.zero_grad()" + " def predict_batch(self, batch): # here is the trick\n", + " return self.model(batch[0].to(self.device)) # you can write any prediciton logic here\n", + "\n", + " # our first time example\n", + " def on_loader_start(self, runner):\n", + " super().on_loader_start(runner)\n", + " self.meters = {\n", + " key: metrics.AdditiveValueMetric(compute_on_call=False)\n", + " for key in [\"loss\", \"mae\"]\n", + " }\n", + "\n", + " def handle_batch(self, batch):\n", + " # Unpack the data. Its structure depends on your model and\n", + " # on what you pass to `train()`.\n", + " x, y = batch\n", + "\n", + " y_pred = self.model(x) # Forward pass\n", + "\n", + " # Compute the loss value\n", + " loss = F.mse_loss(y_pred, y)\n", + "\n", + " # Update metrics (includes the metric that tracks the loss)\n", + " self.batch_metrics.update({\"loss\": loss, \"mae\": F.l1_loss(y_pred, y)})\n", + " for key in [\"loss\", \"mae\"]:\n", + " self.meters[key].update(self.batch_metrics[key].item(), self.batch_size)\n", + "\n", + " if self.is_train_loader:\n", + " # Compute gradients\n", + " loss.backward()\n", + " # Update weights\n", + " # (the optimizer is stored in `self.state`)\n", + " self.optimizer.step()\n", + " self.optimizer.zero_grad()\n", + " \n", + " def on_loader_end(self, runner):\n", + " for key in [\"loss\", \"mae\"]:\n", + " self.loader_metrics[key] = self.meters[key].compute()[0]\n", + " super().on_loader_end(runner)" ] }, { @@ -551,7 +608,7 @@ "prediction = runner.predict_batch(next(iter(loader))) # let's sample first batch from loader\n", "# or `loader` prediction\n", "for prediction in runner.predict_loader(loader=loader):\n", - " assert prediction.detach().cpu().numpy().shape[-1] == 1 # as we have 1-class regression" + " assert prediction.detach().cpu().numpy().shape[-1] == 1 # as we have 1-class regression" ] }, { @@ -569,10 +626,17 @@ "metadata": {}, "outputs": [], "source": [ - "# you can trace your model through batch 'mode'\n", - "traced_model = runner.trace(batch=next(iter(loader)))\n", - "# or loader 'mode' - it will take first batch automatically\n", - "traced_model = runner.trace(loader=loader)" + "features_batch = next(iter(loaders[\"valid\"]))[0]\n", + "# model stochastic weight averaging\n", + "model.load_state_dict(utils.get_averaged_weights_by_path_mask(logdir=\"./logs\", path_mask=\"*.pth\"))\n", + "# model tracing\n", + "utils.trace_model(model=runner.model, batch=features_batch)\n", + "# model quantization\n", + "utils.quantize_model(model=runner.model)\n", + "# model pruning\n", + "utils.prune_model(model=runner.model, pruning_fn=\"l1_unstructured\", amount=0.8)\n", + "# onnx export, catalyst[onnx] or catalyst[onnx-gpu] required\n", + "utils.onnx_export(model=runner.model, batch=features_batch, file=\"./logs/mnist.onnx\", verbose=True)" ] }, { @@ -602,7 +666,7 @@ "import torch\n", "from torch import nn\n", "from torch.nn import functional as F\n", - "from catalyst.contrib.nn import GlobalMaxPool2d, Flatten, Lambda\n", + "from catalyst.contrib.nn.modules import Flatten, GlobalMaxPool2d, Lambda\n", "\n", "# Create the discriminator\n", "discriminator = nn.Sequential(\n", @@ -612,7 +676,7 @@ " nn.LeakyReLU(0.2, inplace=True),\n", " GlobalMaxPool2d(),\n", " Flatten(),\n", - " nn.Linear(128, 1)\n", + " nn.Linear(128, 1),\n", ")\n", "\n", "# Create the generator\n", @@ -631,15 +695,12 @@ ")\n", "\n", "# Final model\n", - "model = {\n", - " \"generator\": generator,\n", - " \"discriminator\": discriminator,\n", - "}\n", - "\n", + "model = {\"generator\": generator, \"discriminator\": discriminator}\n", + "criterion = {\"generator\": nn.BCEWithLogitsLoss(), \"discriminator\": nn.BCEWithLogitsLoss()}\n", "optimizer = {\n", " \"generator\": torch.optim.Adam(generator.parameters(), lr=0.0003, betas=(0.5, 0.999)),\n", " \"discriminator\": torch.optim.Adam(discriminator.parameters(), lr=0.0003, betas=(0.5, 0.999)),\n", - "}\n" + "}" ] }, { @@ -649,7 +710,7 @@ "id": "POY42XRf5Jbd" }, "source": [ - "Here's a feature-complete `GANRunner`, overriding `predict_batch()` to use its own signature, and implementing the entire GAN algorithm in 16 lines in `_handle_batch`:" + "Here's a feature-complete `GANRunner`, overriding `predict_batch()` to use its own signature, and implementing the entire GAN algorithm in 16 lines in `handle_batch`:" ] }, { @@ -664,52 +725,54 @@ "source": [ "class GANRunner(dl.Runner):\n", " \n", - " def _init(self, latent_dim: int):\n", - " self.latent_dim = latent_dim\n", - " self.experiment = None # spoiler for next lesson ;)\n", - "\n", - " def predict_batch(self, batch):\n", - " random_latent_vectors = torch.randn(1, self.latent_dim).to(self.device)\n", - " generated_images = self.model[\"generator\"](random_latent_vectors)\n", - " return generated_images\n", - "\n", - " def _handle_batch(self, batch):\n", - " real_images, _ = batch\n", - " batch_metrics = {}\n", - " \n", - " # Sample random points in the latent space\n", - " batch_size = real_images.shape[0]\n", - " random_latent_vectors = torch.randn(batch_size, self.latent_dim).to(self.device)\n", - " \n", - " # Decode them to fake images\n", - " generated_images = self.model[\"generator\"](random_latent_vectors).detach()\n", - " # Combine them with real images\n", - " combined_images = torch.cat([generated_images, real_images])\n", - " \n", - " # Assemble labels discriminating real from fake images\n", - " labels = torch.cat([\n", - " torch.ones((batch_size, 1)), torch.zeros((batch_size, 1))\n", - " ]).to(self.device)\n", - " # Add random noise to the labels - important trick!\n", - " labels += 0.05 * torch.rand(labels.shape).to(self.device)\n", - " \n", - " # Train the discriminator\n", - " predictions = self.model[\"discriminator\"](combined_images)\n", - " batch_metrics[\"loss_discriminator\"] = \\\n", - " F.binary_cross_entropy_with_logits(predictions, labels)\n", - " \n", - " # Sample random points in the latent space\n", - " random_latent_vectors = torch.randn(batch_size, self.latent_dim).to(self.device)\n", - " # Assemble labels that say \"all real images\"\n", - " misleading_labels = torch.zeros((batch_size, 1)).to(self.device)\n", - " \n", - " # Train the generator\n", - " generated_images = self.model[\"generator\"](random_latent_vectors)\n", - " predictions = self.model[\"discriminator\"](generated_images)\n", - " batch_metrics[\"loss_generator\"] = \\\n", - " F.binary_cross_entropy_with_logits(predictions, misleading_labels)\n", - " \n", - " self.batch_metrics.update(**batch_metrics)" + " def __init__(self, latent_dim: int):\n", + " super().__init__()\n", + " self.latent_dim = latent_dim\n", + "\n", + " def predict_batch(self, batch):\n", + " batch_size = 1\n", + " # Sample random points in the latent space\n", + " random_latent_vectors = torch.randn(batch_size, self.latent_dim).to(self.device)\n", + " # Decode them to fake images\n", + " generated_images = self.model[\"generator\"](random_latent_vectors).detach()\n", + " return generated_images\n", + "\n", + " def handle_batch(self, batch):\n", + " real_images, _ = batch\n", + " batch_size = real_images.shape[0]\n", + "\n", + " # Sample random points in the latent space\n", + " random_latent_vectors = torch.randn(batch_size, self.latent_dim).to(self.device)\n", + "\n", + " # Decode them to fake images\n", + " generated_images = self.model[\"generator\"](random_latent_vectors).detach()\n", + " # Combine them with real images\n", + " combined_images = torch.cat([generated_images, real_images])\n", + "\n", + " # Assemble labels discriminating real from fake images\n", + " labels = \\\n", + " torch.cat([torch.ones((batch_size, 1)), torch.zeros((batch_size, 1))]).to(self.device)\n", + " # Add random noise to the labels - important trick!\n", + " labels += 0.05 * torch.rand(labels.shape).to(self.device)\n", + "\n", + " # Discriminator forward\n", + " combined_predictions = self.model[\"discriminator\"](combined_images)\n", + "\n", + " # Sample random points in the latent space\n", + " random_latent_vectors = torch.randn(batch_size, self.latent_dim).to(self.device)\n", + " # Assemble labels that say \"all real images\"\n", + " misleading_labels = torch.zeros((batch_size, 1)).to(self.device)\n", + "\n", + " # Generator forward\n", + " generated_images = self.model[\"generator\"](random_latent_vectors)\n", + " generated_predictions = self.model[\"discriminator\"](generated_images)\n", + "\n", + " self.batch = {\n", + " \"combined_predictions\": combined_predictions,\n", + " \"labels\": labels,\n", + " \"generated_predictions\": generated_predictions,\n", + " \"misleading_labels\": misleading_labels,\n", + " }" ] }, { @@ -732,8 +795,8 @@ "source": [ "import os\n", "from torch.utils.data import DataLoader\n", - "from catalyst.data.cv import ToTensor\n", "from catalyst.contrib.datasets import MNIST\n", + "from catalyst.data.transforms import ToTensor\n", "\n", "loaders = {\n", " \"train\": DataLoader(\n", @@ -743,20 +806,37 @@ "\n", "runner = GANRunner(latent_dim=latent_dim)\n", "runner.train(\n", - " model=model, \n", + " model=model,\n", + " criterion=criterion,\n", " optimizer=optimizer,\n", " loaders=loaders,\n", " callbacks=[\n", + " dl.CriterionCallback(\n", + " input_key=\"combined_predictions\",\n", + " target_key=\"labels\",\n", + " metric_key=\"loss_discriminator\",\n", + " criterion_key=\"discriminator\",\n", + " ),\n", + " dl.CriterionCallback(\n", + " input_key=\"generated_predictions\",\n", + " target_key=\"misleading_labels\",\n", + " metric_key=\"loss_generator\",\n", + " criterion_key=\"generator\",\n", + " ),\n", " dl.OptimizerCallback(\n", + " model_key=\"generator\", \n", " optimizer_key=\"generator\", \n", " metric_key=\"loss_generator\"\n", " ),\n", " dl.OptimizerCallback(\n", + " model_key=\"discriminator\", \n", " optimizer_key=\"discriminator\", \n", " metric_key=\"loss_discriminator\"\n", " ),\n", " ],\n", + " valid_loader=\"train\",\n", " valid_metric=\"loss_generator\",\n", + " minimize_valid_metric=True,\n", " num_epochs=20,\n", " verbose=True,\n", " logdir=\"./logs_gan\",\n", @@ -827,7 +907,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.7" + "version": "3.7.9" }, "pycharm": { "stem_cell": { From dfb3d687738f25bc0e807022a48e91110ae78e21 Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 6 May 2021 10:06:45 +0300 Subject: [PATCH 2/9] notebooks update v2 --- .../customizing_what_happens_in_train.ipynb | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/examples/notebooks/customizing_what_happens_in_train.ipynb b/examples/notebooks/customizing_what_happens_in_train.ipynb index 6f7a1211d9..76e9709202 100644 --- a/examples/notebooks/customizing_what_happens_in_train.ipynb +++ b/examples/notebooks/customizing_what_happens_in_train.ipynb @@ -473,7 +473,9 @@ " # and stored to `runner.batch_metrics[runner.loss_key]`\n", "\n", "# thanks to prespecified `input_key`, `output_key`, `target_key` and `loss_key`\n", - "# `SupervisedRunner` automatically adds required `CriterionCallback` and `OptimizerCallback`\n", + "# `SupervisedRunner` automatically adds required `CriterionCallback` and `OptimizerCallback`\n", + "# moreover, with specified `logdir`, `valid_loader` and `valid_metric`\n", + "# `SupervisedRunner` automatically adds `CheckpointCallback` and tracks best performing based on selected metric\n", "runner.train(\n", " model=model, \n", " optimizer=optimizer,\n", @@ -483,6 +485,9 @@ " num_epochs=3,\n", " verbose=True,\n", " timeit=False,\n", + " valid_loader=\"valid\", # `loader_key` from loaders to use for model selection\n", + " valid_metric=\"loss\", # `metric_key` to use for model selection\n", + " logdir=\"./logs_supervised\", # logdir to store models checkpoints\n", " callbacks={\n", "# \"criterion_mse\": dl.CriterionCallback(\n", "# input_key=\"logits\",\n", @@ -601,8 +606,10 @@ " num_epochs=3,\n", " verbose=True,\n", " timeit=False,\n", - " load_best_on_end=True, # flag to load best model at the end of the training process\n", - " logdir=\"./logs\", # logdir to store models checkpoints (required for `load_best_on_end`)\n", + " valid_loader=\"valid\", # `loader_key` from loaders to use for model selection\n", + " valid_metric=\"loss\", # `metric_key` to use for model selection\n", + " load_best_on_end=True, # flag to load best model at the end of the training process\n", + " logdir=\"./logs\", # logdir to store models checkpoints (required for `load_best_on_end`)\n", ")\n", "# and use `batch` prediciton\n", "prediction = runner.predict_batch(next(iter(loader))) # let's sample first batch from loader\n", @@ -626,7 +633,7 @@ "metadata": {}, "outputs": [], "source": [ - "features_batch = next(iter(loaders[\"valid\"]))[0]\n", + "features_batch = next(iter(loaders[\"valid\"]))[0].to(runner.device)\n", "# model stochastic weight averaging\n", "model.load_state_dict(utils.get_averaged_weights_by_path_mask(logdir=\"./logs\", path_mask=\"*.pth\"))\n", "# model tracing\n", From 48d7267e8ec3530191d470d061224a791dba51be Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 6 May 2021 19:35:42 +0300 Subject: [PATCH 3/9] notebooks update v2 --- examples/notebooks/customizing_what_happens_in_train.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/notebooks/customizing_what_happens_in_train.ipynb b/examples/notebooks/customizing_what_happens_in_train.ipynb index 76e9709202..507887dc19 100644 --- a/examples/notebooks/customizing_what_happens_in_train.ipynb +++ b/examples/notebooks/customizing_what_happens_in_train.ipynb @@ -643,7 +643,7 @@ "# model pruning\n", "utils.prune_model(model=runner.model, pruning_fn=\"l1_unstructured\", amount=0.8)\n", "# onnx export, catalyst[onnx] or catalyst[onnx-gpu] required\n", - "utils.onnx_export(model=runner.model, batch=features_batch, file=\"./logs/mnist.onnx\", verbose=True)" + "# utils.onnx_export(model=runner.model, batch=features_batch, file=\"./logs/mnist.onnx\", verbose=True)" ] }, { From 3e25fd7dcf7e7783fd19a81c3761ac60b958b19c Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Thu, 6 May 2021 19:39:10 +0300 Subject: [PATCH 4/9] notebooks update v2 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63d50655e4..01b18fb764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Fixed -- +- customizing what happens in `train()` notebook ([#1203](https://github.com/catalyst-team/catalyst/pull/1203)) ## [21.04.2] - 2021-04-30 From c275ba6fc25940eb2dbe9d10d7069ad2d64d1f1f Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Fri, 7 May 2021 08:48:08 +0300 Subject: [PATCH 5/9] minimal RL --- examples/reinforcement_learning/.DS_Store | Bin 0 -> 6148 bytes examples/reinforcement_learning/README.md | 16 + examples/reinforcement_learning/ddpg.py | 395 +++++++++++++++++++ examples/reinforcement_learning/dqn.py | 359 +++++++++++++++++ examples/reinforcement_learning/reinforce.py | 264 +++++++++++++ 5 files changed, 1034 insertions(+) create mode 100644 examples/reinforcement_learning/.DS_Store create mode 100644 examples/reinforcement_learning/README.md create mode 100644 examples/reinforcement_learning/ddpg.py create mode 100644 examples/reinforcement_learning/dqn.py create mode 100644 examples/reinforcement_learning/reinforce.py diff --git a/examples/reinforcement_learning/.DS_Store b/examples/reinforcement_learning/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..b1f5e4e37260b95ff18fff0483ab75a80c0cafbe GIT binary patch literal 6148 zcmeHKJ5B>Z4D~>YD4|G6fy9a%qzwg2w45LZ0Od=hO*RBUy#gW8a+35^NE`qP&cF#c z0MBC!cC!f~4FY6G_S>DYXFN}ubxcIAdC{#C)rhDJWh_--76{L?7Gy(5wt&vBk%|IY{6n=xPv{3`}rDQZPc97$_y w;c#4Q1Lz5qh5b@N9|EK0*gtS7-iCU?T)+>&P%#z62*myfcpA(Y13${ZJ8U_OIRF3v literal 0 HcmV?d00001 diff --git a/examples/reinforcement_learning/README.md b/examples/reinforcement_learning/README.md new file mode 100644 index 0000000000..3320740b33 --- /dev/null +++ b/examples/reinforcement_learning/README.md @@ -0,0 +1,16 @@ +# Requirements +```bash +pip install catalyst gym +``` + +# Run +```bash +# DQN +python dqn.py + +# DDPG +python ddpg.py + +# REINFORCE +python reinforce.py +``` \ No newline at end of file diff --git a/examples/reinforcement_learning/ddpg.py b/examples/reinforcement_learning/ddpg.py new file mode 100644 index 0000000000..5cde4e1ff8 --- /dev/null +++ b/examples/reinforcement_learning/ddpg.py @@ -0,0 +1,395 @@ +from typing import Iterator, Optional, Sequence, Tuple +from collections import deque, namedtuple + +import gym +import numpy as np +import torch +import torch.nn as nn +from torch.utils.data import DataLoader +from torch.utils.data.dataset import IterableDataset + +from catalyst import dl, utils + +# Off-policy common + +Transition = namedtuple( + "Transition", field_names=["state", "action", "reward", "done", "next_state"] +) + + +class ReplayBuffer: + def __init__(self, capacity: int): + self.buffer = deque(maxlen=capacity) + + def append(self, transition: Transition): + self.buffer.append(transition) + + def sample(self, size: int) -> Sequence[np.array]: + indices = np.random.choice(len(self.buffer), size, replace=size > len(self.buffer)) + states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices]) + states, actions, rewards, dones, next_states = ( + np.array(states, dtype=np.float32), + np.array(actions, dtype=np.int64), + np.array(rewards, dtype=np.float32), + np.array(dones, dtype=np.bool), + np.array(next_states, dtype=np.float32), + ) + return states, actions, rewards, dones, next_states + + def __len__(self) -> int: + return len(self.buffer) + + +# as far as RL does not have some predefined dataset, +# we need to specify epoch length by ourselfs +class ReplayDataset(IterableDataset): + def __init__(self, buffer: ReplayBuffer, epoch_size: int = int(1e3)): + self.buffer = buffer + self.epoch_size = epoch_size + + def __iter__(self) -> Iterator[Sequence[np.array]]: + states, actions, rewards, dones, next_states = self.buffer.sample(self.epoch_size) + for i in range(len(dones)): + yield states[i], actions[i], rewards[i], dones[i], next_states[i] + + def __len__(self) -> int: + return self.epoch_size + + +def soft_update(target: nn.Module, source: nn.Module, tau: float): + """Updates the target data with smoothing by ``tau``""" + for target_param, param in zip(target.parameters(), source.parameters()): + target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau) + + +# DDPG + + +class NormalizedActions(gym.ActionWrapper): + def action(self, action: float) -> float: + low_bound = self.action_space.low + upper_bound = self.action_space.high + + action = low_bound + (action + 1.0) * 0.5 * (upper_bound - low_bound) + action = np.clip(action, low_bound, upper_bound) + + return action + + def _reverse_action(self, action: float) -> float: + low_bound = self.action_space.low + upper_bound = self.action_space.high + + action = 2 * (action - low_bound) / (upper_bound - low_bound) - 1 + action = np.clip(action, low_bound, upper_bound) + + return action + + +def get_action( + env, network: nn.Module, state: np.array, sigma: Optional[float] = None +) -> np.array: + state = torch.tensor(state, dtype=torch.float32).unsqueeze(0) + action = network(state).detach().cpu().numpy()[0] + if sigma is not None: + action = np.random.normal(action, sigma) + return action + + +def generate_session( + env, + network: nn.Module, + sigma: Optional[float] = None, + replay_buffer: Optional[ReplayBuffer] = None, +) -> Tuple[float, int]: + total_reward = 0 + state = env.reset() + + for t in range(env.spec.max_episode_steps): + action = get_action(env, network, state=state, sigma=sigma) + next_state, reward, done, _ = env.step(action) + + if replay_buffer is not None: + transition = Transition(state, action, reward, done, next_state) + replay_buffer.append(transition) + + total_reward += reward + state = next_state + if done: + break + + return total_reward, t + + +def generate_sessions( + env, + network: nn.Module, + sigma: Optional[float] = None, + replay_buffer: Optional[ReplayBuffer] = None, + num_sessions: int = 100, +) -> Tuple[float, int]: + sessions_reward, sessions_steps = 0, 0 + for i_episone in range(num_sessions): + r, t = generate_session( + env=env, network=network, sigma=sigma, replay_buffer=replay_buffer, + ) + sessions_reward += r + sessions_steps += t + return sessions_reward, sessions_steps + + +def get_network_actor(env): + inner_fn = utils.get_optimal_inner_init(nn.ReLU) + outer_fn = utils.outer_init + + network = torch.nn.Sequential( + nn.Linear(env.observation_space.shape[0], 400), nn.ReLU(), nn.Linear(400, 300), nn.ReLU(), + ) + head = torch.nn.Sequential(nn.Linear(300, 1), nn.Tanh()) + + network.apply(inner_fn) + head.apply(outer_fn) + + return torch.nn.Sequential(network, head) + + +def get_network_critic(env): + inner_fn = utils.get_optimal_inner_init(nn.LeakyReLU) + outer_fn = utils.outer_init + + network = torch.nn.Sequential( + nn.Linear(env.observation_space.shape[0] + 1, 400), + nn.LeakyReLU(0.01), + nn.Linear(400, 300), + nn.LeakyReLU(0.01), + ) + head = nn.Linear(300, 1) + + network.apply(inner_fn) + head.apply(outer_fn) + + return torch.nn.Sequential(network, head) + + +# Catalyst + + +class GameCallback(dl.Callback): + def __init__( + self, + *, + env, + replay_buffer: ReplayBuffer, + session_period: int, + sigma: float, + actor_key: str, + ): + super().__init__(order=0) + self.env = env + self.replay_buffer = replay_buffer + self.session_period = session_period + self.sigma = sigma + self.actor_key = actor_key + + def on_stage_start(self, runner: dl.IRunner): + self.actor = runner.model[self.actor_key] + + self.actor.eval() + generate_sessions( + env=self.env, + network=self.actor, + sigma=self.sigma, + replay_buffer=self.replay_buffer, + num_sessions=1000, + ) + self.actor.train() + + def on_epoch_start(self, runner: dl.IRunner): + self.session_counter = 0 + self.session_steps = 0 + + def on_batch_end(self, runner: dl.IRunner): + if runner.global_batch_step % self.session_period == 0: + self.actor.eval() + + session_reward, session_steps = generate_session( + env=self.env, + network=self.actor, + sigma=self.sigma, + replay_buffer=self.replay_buffer, + ) + + self.session_counter += 1 + self.session_steps += session_steps + + runner.batch_metrics.update({"s_reward": session_reward}) + runner.batch_metrics.update({"s_steps": session_steps}) + + self.actor.train() + + def on_epoch_end(self, runner: dl.IRunner): + num_sessions = 100 + + self.actor.eval() + valid_rewards, valid_steps = generate_sessions( + env=self.env, network=self.actor, num_sessions=num_sessions + ) + self.actor.train() + + valid_rewards /= num_sessions + runner.epoch_metrics["_epoch_"]["num_samples"] = self.session_steps + runner.epoch_metrics["_epoch_"]["updates_per_sample"] = ( + runner.loader_sample_step / self.session_steps + ) + runner.epoch_metrics["_epoch_"]["v_reward"] = valid_rewards + + +class CustomRunner(dl.Runner): + def __init__( + self, *, gamma: float, tau: float, tau_period: int = 1, **kwargs, + ): + super().__init__(**kwargs) + self.gamma = gamma + self.tau = tau + self.tau_period = tau_period + + def on_stage_start(self, runner: dl.IRunner): + super().on_stage_start(runner) + soft_update(self.model["target_actor"], self.model["actor"], 1.0) + soft_update(self.model["target_critic"], self.model["critic"], 1.0) + + def handle_batch(self, batch: Sequence[torch.Tensor]): + # model train/valid step + states, actions, rewards, dones, next_states = batch + actor, target_actor = self.model["actor"], self.model["target_actor"] + critic, target_critic = self.model["critic"], self.model["target_critic"] + actor_optimizer, critic_optimizer = self.optimizer["actor"], self.optimizer["critic"] + + # get actions for the current state + pred_actions = actor(states) + # get q-values for the actions in current states + pred_critic_states = torch.cat([states, pred_actions], 1) + # use q-values to train the actor model + policy_loss = (-critic(pred_critic_states)).mean() + + with torch.no_grad(): + # get possible actions for the next states + next_state_actions = target_actor(next_states) + # get possible q-values for the next actions + next_critic_states = torch.cat([next_states, next_state_actions], 1) + next_state_values = target_critic(next_critic_states).detach().squeeze() + next_state_values[dones] = 0.0 + + # compute Bellman's equation value + target_state_values = next_state_values * self.gamma + rewards + # compute predicted values + critic_states = torch.cat([states, actions], 1) + state_values = critic(critic_states).squeeze() + + # train the critic model + value_loss = self.criterion(state_values, target_state_values.detach()) + + self.batch_metrics.update({"critic_loss": value_loss, "actor_loss": policy_loss}) + + if self.is_train_loader: + actor.zero_grad() + actor_optimizer.zero_grad() + policy_loss.backward() + actor_optimizer.step() + + critic.zero_grad() + critic_optimizer.zero_grad() + value_loss.backward() + critic_optimizer.step() + + if self.global_batch_step % self.tau_period == 0: + soft_update(target_actor, actor, self.tau) + soft_update(target_critic, critic, self.tau) + + +if __name__ == "__main__": + # data + batch_size = 64 + epoch_size = int(1e3) * batch_size + buffer_size = int(1e5) + # runner settings, ~training + gamma = 0.99 + tau = 0.01 + tau_period = 1 + # callback, ~exploration + session_period = 1 + sigma = 0.3 + # optimization + lr_actor = 1e-4 + lr_critic = 1e-3 + + # You can change game + # env_name = "LunarLanderContinuous-v2" + env_name = "Pendulum-v0" + env = NormalizedActions(gym.make(env_name)) + replay_buffer = ReplayBuffer(buffer_size) + + actor, target_actor = get_network_actor(env), get_network_actor(env) + critic, target_critic = get_network_critic(env), get_network_critic(env) + utils.set_requires_grad(target_actor, requires_grad=False) + utils.set_requires_grad(target_critic, requires_grad=False) + + models = { + "actor": actor, + "critic": critic, + "target_actor": target_actor, + "target_critic": target_critic, + } + + criterion = torch.nn.MSELoss() + optimizer = { + "actor": torch.optim.Adam(actor.parameters(), lr_actor), + "critic": torch.optim.Adam(critic.parameters(), lr=lr_critic), + } + + loaders = { + "train_game": DataLoader( + ReplayDataset(replay_buffer, epoch_size=epoch_size), batch_size=batch_size, + ), + } + + runner = CustomRunner(gamma=gamma, tau=tau, tau_period=tau_period,) + + runner.train( + model=models, + criterion=criterion, + optimizer=optimizer, + loaders=loaders, + logdir="./logs_ddpg", + num_epochs=10, + verbose=True, + valid_loader="_epoch_", + valid_metric="v_reward", + minimize_valid_metric=False, + load_best_on_end=True, + callbacks=[ + GameCallback( + env=env, + replay_buffer=replay_buffer, + session_period=session_period, + sigma=sigma, + actor_key="actor", + ) + ], + ) + + env = gym.wrappers.Monitor(gym.make(env_name), directory="videos_ddpg", force=True) + generate_sessions(env=env, network=runner.model["actor"], num_sessions=100) + env.close() + + # # show video + # from IPython.display import HTML + # import os + # + # video_names = list(filter(lambda s: s.endswith(".mp4"), os.listdir("./videos_ddpg/"))) + # + # HTML(""" + # + # """.format("./videos/" + video_names[-1])) + # # this may or may not be _last_ video. Try other indices diff --git a/examples/reinforcement_learning/dqn.py b/examples/reinforcement_learning/dqn.py new file mode 100644 index 0000000000..6514752424 --- /dev/null +++ b/examples/reinforcement_learning/dqn.py @@ -0,0 +1,359 @@ +from typing import Iterator, Optional, Sequence, Tuple +from collections import deque, namedtuple + +import gym +import numpy as np +import torch +import torch.nn as nn +from torch.utils.data import DataLoader +from torch.utils.data.dataset import IterableDataset + +from catalyst import dl, utils + +# Off-policy common + +Transition = namedtuple( + "Transition", field_names=["state", "action", "reward", "done", "next_state"] +) + + +class ReplayBuffer: + def __init__(self, capacity: int): + self.buffer = deque(maxlen=capacity) + + def append(self, transition: Transition): + self.buffer.append(transition) + + def sample(self, size: int) -> Sequence[np.array]: + indices = np.random.choice(len(self.buffer), size, replace=size > len(self.buffer)) + states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices]) + states, actions, rewards, dones, next_states = ( + np.array(states, dtype=np.float32), + np.array(actions, dtype=np.int64), + np.array(rewards, dtype=np.float32), + np.array(dones, dtype=np.bool), + np.array(next_states, dtype=np.float32), + ) + return states, actions, rewards, dones, next_states + + def __len__(self) -> int: + return len(self.buffer) + + +# as far as RL does not have some predefined dataset, +# we need to specify epoch length by ourselfs +class ReplayDataset(IterableDataset): + def __init__(self, buffer: ReplayBuffer, epoch_size: int = int(1e3)): + self.buffer = buffer + self.epoch_size = epoch_size + + def __iter__(self) -> Iterator[Sequence[np.array]]: + states, actions, rewards, dones, next_states = self.buffer.sample(self.epoch_size) + for i in range(len(dones)): + yield states[i], actions[i], rewards[i], dones[i], next_states[i] + + def __len__(self) -> int: + return self.epoch_size + + +def soft_update(target: nn.Module, source: nn.Module, tau: float): + """Updates the target data with smoothing by ``tau``""" + for target_param, param in zip(target.parameters(), source.parameters()): + target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau) + + +# DQN + + +def get_action(env, network: nn.Module, state: np.array, epsilon: float = -1) -> int: + if np.random.random() < epsilon: + action = env.action_space.sample() + else: + state = torch.tensor(state[None], dtype=torch.float32) + q_values = network(state).detach().cpu().numpy()[0] + action = np.argmax(q_values) + + return int(action) + + +def generate_session( + env, + network: nn.Module, + t_max: int = 1000, + epsilon: float = -1, + replay_buffer: Optional[ReplayBuffer] = None, +) -> Tuple[float, int]: + total_reward = 0 + state = env.reset() + + for t in range(t_max): + action = get_action(env, network, state=state, epsilon=epsilon) + next_state, reward, done, _ = env.step(action) + + if replay_buffer is not None: + transition = Transition(state, action, reward, done, next_state) + replay_buffer.append(transition) + + total_reward += reward + state = next_state + if done: + break + + return total_reward, t + + +def generate_sessions( + env, + network: nn.Module, + t_max: int = 1000, + epsilon: float = -1, + replay_buffer: ReplayBuffer = None, + num_sessions: int = 100, +) -> Tuple[float, int]: + sessions_reward, sessions_steps = 0, 0 + for i_episone in range(num_sessions): + r, t = generate_session( + env=env, network=network, t_max=t_max, epsilon=epsilon, replay_buffer=replay_buffer, + ) + sessions_reward += r + sessions_steps += t + return sessions_reward, sessions_steps + + +def get_network(env, num_hidden: int = 128): + inner_fn = utils.get_optimal_inner_init(nn.ReLU) + outer_fn = utils.outer_init + + network = torch.nn.Sequential( + nn.Linear(env.observation_space.shape[0], num_hidden), + nn.ReLU(), + nn.Linear(num_hidden, num_hidden), + nn.ReLU(), + ) + head = nn.Linear(num_hidden, env.action_space.n) + + network.apply(inner_fn) + head.apply(outer_fn) + + return torch.nn.Sequential(network, head) + + +# Catalyst + + +class GameCallback(dl.Callback): + def __init__( + self, + *, + env, + replay_buffer: ReplayBuffer, + session_period: int, + epsilon: float, + epsilon_k: int, + actor_key, + ): + super().__init__(order=0) + self.env = env + self.replay_buffer = replay_buffer + self.session_period = session_period + self.epsilon = epsilon + self.epsilon_k = epsilon_k + self.actor_key = actor_key + self._initialized = False + + def on_epoch_start(self, runner: dl.IRunner): + self.epsilon *= self.epsilon_k + self.session_counter = 0 + self.session_steps = 0 + + if self._initialized: + return + + self.actor = runner.model[self.actor_key] + + self.actor.eval() + generate_sessions( + env=self.env, + network=self.actor, + epsilon=self.epsilon, + replay_buffer=self.replay_buffer, + num_sessions=1000, + ) + self.actor.train() + self._initialized = True + + def on_batch_end(self, runner: dl.IRunner): + if runner.global_batch_step % self.session_period == 0: + self.actor.eval() + + session_reward, session_steps = generate_session( + env=self.env, + network=self.actor, + epsilon=self.epsilon, + replay_buffer=self.replay_buffer, + ) + + self.session_counter += 1 + self.session_steps += session_steps + + runner.batch_metrics.update({"s_reward": session_reward}) + runner.batch_metrics.update({"s_steps": session_steps}) + + self.actor.train() + + def on_epoch_end(self, runner: dl.IRunner): + num_sessions = 100 + + self.actor.eval() + valid_rewards, valid_steps = generate_sessions( + env=self.env, network=self.actor, num_sessions=num_sessions + ) + self.actor.train() + + valid_rewards /= num_sessions + runner.epoch_metrics["_epoch_"]["num_samples"] = self.session_steps + runner.epoch_metrics["_epoch_"]["updates_per_sample"] = ( + runner.loader_sample_step / self.session_steps + ) + runner.epoch_metrics["_epoch_"]["v_reward"] = valid_rewards + runner.epoch_metrics["_epoch_"]["epsilon"] = self.epsilon + + +class CustomRunner(dl.Runner): + def __init__( + self, + *, + gamma: float, + tau: float, + tau_period: int = 1, + origin_key: str = "origin", + target_key: str = "target", + **kwargs, + ): + super().__init__(**kwargs) + self.gamma: float = gamma + self.tau: float = tau + self.tau_period: int = tau_period + self.origin_key: str = origin_key + self.target_key: str = target_key + self.origin_network: nn.Module = None + self.target_network: nn.Module = None + self._initialized = False + + def on_stage_start(self, runner: dl.IRunner): + super().on_stage_start(runner) + if self._initialized: + return + self.origin_network = self.model[self.origin_key] + self.target_network = self.model[self.target_key] + soft_update(self.target_network, self.origin_network, 1.0) + + def handle_batch(self, batch: Sequence[np.array]): + # model train/valid step + states, actions, rewards, dones, next_states = batch + network, target_network = self.origin_network, self.target_network + + # get q-values for all actions in current states + state_qvalues = network(states) + # select q-values for chosen actions + state_action_qvalues = state_qvalues.gather(1, actions.unsqueeze(-1)).squeeze(-1) + + # compute q-values for all actions in next states + # compute V*(next_states) using predicted next q-values + # at the last state we shall use simplified formula: + # Q(s,a) = r(s,a) since s' doesn't exist + with torch.no_grad(): + next_state_qvalues = target_network(next_states) + next_state_values = next_state_qvalues.max(1)[0] + next_state_values[dones] = 0.0 + next_state_values = next_state_values.detach() + + # compute "target q-values" for loss, + # it's what's inside square parentheses in the above formula. + target_state_action_qvalues = next_state_values * self.gamma + rewards + + # mean squared error loss to minimize + loss = self.criterion(state_action_qvalues, target_state_action_qvalues.detach()) + self.batch_metrics.update({"loss": loss}) + + if self.is_train_loader: + loss.backward() + self.optimizer.step() + self.optimizer.zero_grad() + + if self.global_batch_step % self.tau_period == 0: + soft_update(target_network, network, self.tau) + + +if __name__ == "__main__": + batch_size = 64 + epoch_size = int(1e3) * batch_size + buffer_size = int(1e5) + # runner settings, ~training + gamma = 0.99 + tau = 0.01 + tau_period = 1 # in batches + # callback, ~exploration + session_period = 100 # in batches + epsilon = 0.98 + epsilon_k = 0.9 + # optimization + lr = 3e-4 + + # env_name = "LunarLander-v2" + env_name = "CartPole-v1" + env = gym.make(env_name) + replay_buffer = ReplayBuffer(buffer_size) + + network, target_network = get_network(env), get_network(env) + utils.set_requires_grad(target_network, requires_grad=False) + models = {"origin": network, "target": target_network} + criterion = torch.nn.MSELoss() + optimizer = torch.optim.Adam(network.parameters(), lr=lr) + loaders = { + "train_game": DataLoader( + ReplayDataset(replay_buffer, epoch_size=epoch_size), batch_size=batch_size, + ), + } + + runner = CustomRunner(gamma=gamma, tau=tau, tau_period=tau_period) + runner.train( + model=models, + criterion=criterion, + optimizer=optimizer, + loaders=loaders, + logdir="./logs_dqn", + num_epochs=10, + verbose=True, + valid_loader="_epoch_", + valid_metric="v_reward", + minimize_valid_metric=False, + load_best_on_end=True, + callbacks=[ + GameCallback( + env=env, + replay_buffer=replay_buffer, + session_period=session_period, + epsilon=epsilon, + epsilon_k=epsilon_k, + actor_key="origin", + ) + ], + ) + + env = gym.wrappers.Monitor(gym.make(env_name), directory="videos_dqn", force=True) + generate_sessions(env=env, network=runner.model["origin"], num_sessions=100) + env.close() + + # # show video + # from IPython.display import HTML + # import os + # + # video_names = list(filter(lambda s: s.endswith(".mp4"), os.listdir("./videos_dqn/"))) + # + # HTML(""" + # + # """.format("./videos/" + video_names[-1])) + # # this may or may not be _last_ video. Try other indices diff --git a/examples/reinforcement_learning/reinforce.py b/examples/reinforcement_learning/reinforce.py new file mode 100644 index 0000000000..b4d24e47c4 --- /dev/null +++ b/examples/reinforcement_learning/reinforce.py @@ -0,0 +1,264 @@ +import typing as tp +from collections import deque, namedtuple + +import gym +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +from torch.utils.data import DataLoader +from torch.utils.data.dataset import IterableDataset + +from catalyst import dl, utils + +# On-policy common + +Rollout = namedtuple("Rollout", field_names=["states", "actions", "rewards",]) + + +class RolloutBuffer: + def __init__(self, capacity: int): + self.capacity = capacity + self.buffer = deque(maxlen=capacity) + + def append(self, rollout: Rollout): + self.buffer.append(rollout) + + def sample(self, idx: int) -> tp.Sequence[np.array]: + states, actions, rewards = self.buffer[idx] + states, actions, rewards = ( + np.array(states, dtype=np.float32), + np.array(actions, dtype=np.int64), + np.array(rewards, dtype=np.float32), + ) + return states, actions, rewards + + def __len__(self) -> int: + return len(self.buffer) + + +# as far as RL does not have some predefined dataset, +# we need to specify epoch length by ourselfs +class RolloutDataset(IterableDataset): + def __init__(self, buffer: RolloutBuffer): + self.buffer = buffer + + def __iter__(self) -> tp.Iterator[tp.Sequence[np.array]]: + for i in range(len(self.buffer)): + states, actions, rewards = self.buffer.sample(i) + yield states, actions, rewards + self.buffer.buffer.clear() + + def __len__(self) -> int: + return self.buffer.capacity + + +# REINFORCE + + +def get_cumulative_rewards(rewards, gamma=0.99): + G = [rewards[-1]] + for r in reversed(rewards[:-1]): + G.insert(0, r + gamma * G[0]) + return G + + +def to_one_hot(y, n_dims=None): + """ Takes an integer vector and converts it to 1-hot matrix. """ + y_tensor = y + y_tensor = y_tensor.type(torch.LongTensor).view(-1, 1) + n_dims = n_dims if n_dims is not None else int(torch.max(y_tensor)) + 1 + y_one_hot = torch.zeros(y_tensor.size()[0], n_dims).scatter_(1, y_tensor, 1) + return y_one_hot + + +def get_action(env, network: nn.Module, state: np.array, epsilon: float = -1) -> int: + state = torch.tensor(state[None], dtype=torch.float32) + logits = network(state).detach() + probas = F.softmax(logits, -1).cpu().numpy()[0] + action = np.random.choice(len(probas), p=probas) + return int(action) + + +def generate_session( + env, + network: nn.Module, + t_max: int = 1000, + epsilon: float = -1, + rollout_buffer: tp.Optional[RolloutBuffer] = None, +) -> tp.Tuple[float, int]: + total_reward = 0 + states, actions, rewards = [], [], [] + state = env.reset() + + for t in range(t_max): + action = get_action(env, network, state=state, epsilon=epsilon) + next_state, reward, done, _ = env.step(action) + + # record session history to train later + states.append(state) + actions.append(action) + rewards.append(reward) + + total_reward += reward + state = next_state + if done: + break + if rollout_buffer is not None: + rollout_buffer.append(Rollout(states, actions, rewards)) + + return total_reward, t + + +def generate_sessions( + env, + network: nn.Module, + t_max: int = 1000, + epsilon: float = -1, + rollout_buffer: tp.Optional[RolloutBuffer] = None, + num_sessions: int = 100, +) -> tp.Tuple[float, int]: + sessions_reward, sessions_steps = 0, 0 + for i_episone in range(num_sessions): + r, t = generate_session( + env=env, network=network, t_max=t_max, epsilon=epsilon, rollout_buffer=rollout_buffer, + ) + sessions_reward += r + sessions_steps += t + return sessions_reward, sessions_steps + + +def get_network(env, num_hidden: int = 128): + inner_fn = utils.get_optimal_inner_init(nn.ReLU) + outer_fn = utils.outer_init + + network = torch.nn.Sequential( + nn.Linear(env.observation_space.shape[0], num_hidden), + nn.ReLU(), + nn.Linear(num_hidden, num_hidden), + nn.ReLU(), + ) + head = nn.Linear(num_hidden, env.action_space.n) + + network.apply(inner_fn) + head.apply(outer_fn) + + return torch.nn.Sequential(network, head) + + +# Catalyst + + +class GameCallback(dl.Callback): + def __init__(self, *, env, rollout_buffer: RolloutBuffer): + super().__init__(order=0) + self.env = env + self.rollout_buffer = rollout_buffer + + def on_epoch_start(self, runner: dl.IRunner): + self.actor = runner.model + + self.actor.eval() + generate_sessions( + env=self.env, network=self.actor, rollout_buffer=self.rollout_buffer, num_sessions=100, + ) + self.actor.train() + + def on_epoch_end(self, runner: dl.IRunner): + num_sessions = 100 + + self.actor.eval() + valid_rewards, valid_steps = generate_sessions( + env=self.env, network=self.actor, num_sessions=num_sessions + ) + self.actor.train() + + valid_rewards /= num_sessions + runner.epoch_metrics["_epoch_"]["v_reward"] = valid_rewards + + +class CustomRunner(dl.Runner): + def __init__( + self, *, gamma: float, **kwargs, + ): + super().__init__(**kwargs) + self.gamma: float = gamma + self._initialized = False + + def handle_batch(self, batch: tp.Sequence[np.array]): + # model train/valid step + # ATTENTION: + # because of different trajectories lens + # ONLY batch_size==1 supported + states, actions, rewards = batch + states, actions, rewards = states[0], actions[0], rewards[0] + cumulative_returns = torch.tensor(get_cumulative_rewards(rewards, gamma)) + network = self.model + + logits = network(states) + probas = F.softmax(logits, -1) + logprobas = F.log_softmax(logits, -1) + n_actions = probas.shape[1] + logprobas_for_actions = torch.sum(logprobas * to_one_hot(actions, n_dims=n_actions), dim=1) + + J_hat = torch.mean(logprobas_for_actions * cumulative_returns) + entropy_reg = -torch.mean(torch.sum(probas * logprobas, dim=1)) + loss = -J_hat - 0.1 * entropy_reg + + self.batch_metrics.update({"loss": loss}) + if self.is_train_loader: + loss.backward() + self.optimizer.step() + self.optimizer.zero_grad() + + +if __name__ == "__main__": + batch_size = 1 + epoch_size = int(1e3) * batch_size + buffer_size = int(1e2) + # runner settings + gamma = 0.99 + # optimization + lr = 3e-4 + + # env_name = "LunarLander-v2" + env_name = "CartPole-v1" + env = gym.make(env_name) + rollout_buffer = RolloutBuffer(buffer_size) + + model = get_network(env) + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + loaders = { + "train_game": DataLoader(RolloutDataset(rollout_buffer), batch_size=batch_size,), + } + + runner = CustomRunner(gamma=gamma) + runner.train( + model=model, + optimizer=optimizer, + loaders=loaders, + logdir="./logs_dqn", + num_epochs=10, + verbose=True, + valid_loader="_epoch_", + valid_metric="v_reward", + minimize_valid_metric=False, + load_best_on_end=True, + callbacks=[GameCallback(env=env, rollout_buffer=rollout_buffer,)], + ) + + env = gym.wrappers.Monitor(gym.make(env_name), directory="videos_reinforce", force=True) + generate_sessions(env=env, network=model, num_sessions=100) + env.close() + + # from IPython.display import HTML + # import os + # + # video_names = list(filter(lambda s: s.endswith(".mp4"), os.listdir("./videos_reinforce/"))) + # + # HTML(""" + # + # """.format("./videos/" + video_names[-1])) + # # this may or may not be _last_ video. Try other indices From 3b0371c3a0e295262f27c292963385e018a5b348 Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Fri, 7 May 2021 08:49:42 +0300 Subject: [PATCH 6/9] minimal RL --- examples/reinforcement_learning/.DS_Store | Bin 6148 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 examples/reinforcement_learning/.DS_Store diff --git a/examples/reinforcement_learning/.DS_Store b/examples/reinforcement_learning/.DS_Store deleted file mode 100644 index b1f5e4e37260b95ff18fff0483ab75a80c0cafbe..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKJ5B>Z4D~>YD4|G6fy9a%qzwg2w45LZ0Od=hO*RBUy#gW8a+35^NE`qP&cF#c z0MBC!cC!f~4FY6G_S>DYXFN}ubxcIAdC{#C)rhDJWh_--76{L?7Gy(5wt&vBk%|IY{6n=xPv{3`}rDQZPc97$_y w;c#4Q1Lz5qh5b@N9|EK0*gtS7-iCU?T)+>&P%#z62*myfcpA(Y13${ZJ8U_OIRF3v From 5c24def8b5fa861855e00469ec0fa9db6451792d Mon Sep 17 00:00:00 2001 From: Sergey Kolesnikov Date: Fri, 7 May 2021 10:26:02 +0300 Subject: [PATCH 7/9] minimal RL 2 --- .../notebooks/reinforcement_learning.ipynb | 1349 +++++++++++++++++ examples/reinforcement_learning/README.md | 2 +- examples/reinforcement_learning/ddpg.py | 1 + examples/reinforcement_learning/dqn.py | 5 +- examples/reinforcement_learning/reinforce.py | 17 +- 5 files changed, 1363 insertions(+), 11 deletions(-) create mode 100644 examples/notebooks/reinforcement_learning.ipynb diff --git a/examples/notebooks/reinforcement_learning.ipynb b/examples/notebooks/reinforcement_learning.ipynb new file mode 100644 index 0000000000..4abf6080c8 --- /dev/null +++ b/examples/notebooks/reinforcement_learning.ipynb @@ -0,0 +1,1349 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "! pip install catalyst==21.04.2 gym==0.18.0" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Off-policy DQN" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Iterator, Optional, Sequence, Tuple\n", + "from collections import deque, namedtuple\n", + "\n", + "import gym\n", + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "from torch.utils.data import DataLoader\n", + "from torch.utils.data.dataset import IterableDataset\n", + "\n", + "from catalyst import dl, utils" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## RL common" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "Transition = namedtuple(\n", + " \"Transition\", field_names=[\"state\", \"action\", \"reward\", \"done\", \"next_state\"]\n", + ")\n", + "\n", + "\n", + "class ReplayBuffer:\n", + " def __init__(self, capacity: int):\n", + " self.buffer = deque(maxlen=capacity)\n", + "\n", + " def append(self, transition: Transition):\n", + " self.buffer.append(transition)\n", + "\n", + " def sample(self, size: int) -> Sequence[np.array]:\n", + " indices = np.random.choice(len(self.buffer), size, replace=size > len(self.buffer))\n", + " states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])\n", + " states, actions, rewards, dones, next_states = (\n", + " np.array(states, dtype=np.float32),\n", + " np.array(actions, dtype=np.int64),\n", + " np.array(rewards, dtype=np.float32),\n", + " np.array(dones, dtype=np.bool),\n", + " np.array(next_states, dtype=np.float32),\n", + " )\n", + " return states, actions, rewards, dones, next_states\n", + "\n", + " def __len__(self) -> int:\n", + " return len(self.buffer)\n", + "\n", + "\n", + "# as far as RL does not have some predefined dataset,\n", + "# we need to specify epoch length by ourselfs\n", + "class ReplayDataset(IterableDataset):\n", + " def __init__(self, buffer: ReplayBuffer, epoch_size: int = int(1e3)):\n", + " self.buffer = buffer\n", + " self.epoch_size = epoch_size\n", + "\n", + " def __iter__(self) -> Iterator[Sequence[np.array]]:\n", + " states, actions, rewards, dones, next_states = self.buffer.sample(self.epoch_size)\n", + " for i in range(len(dones)):\n", + " yield states[i], actions[i], rewards[i], dones[i], next_states[i]\n", + "\n", + " def __len__(self) -> int:\n", + " return self.epoch_size\n", + "\n", + "\n", + "def soft_update(target: nn.Module, source: nn.Module, tau: float):\n", + " \"\"\"Updates the target data with smoothing by ``tau``\"\"\"\n", + " for target_param, param in zip(target.parameters(), source.parameters()):\n", + " target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DQN" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_action(env, network: nn.Module, state: np.array, epsilon: float = -1) -> int:\n", + " if np.random.random() < epsilon:\n", + " action = env.action_space.sample()\n", + " else:\n", + " state = torch.tensor(state[None], dtype=torch.float32)\n", + " q_values = network(state).detach().cpu().numpy()[0]\n", + " action = np.argmax(q_values)\n", + "\n", + " return int(action)\n", + "\n", + "\n", + "def generate_session(\n", + " env,\n", + " network: nn.Module,\n", + " t_max: int = 1000,\n", + " epsilon: float = -1,\n", + " replay_buffer: Optional[ReplayBuffer] = None,\n", + ") -> Tuple[float, int]:\n", + " total_reward = 0\n", + " state = env.reset()\n", + "\n", + " for t in range(t_max):\n", + " action = get_action(env, network, state=state, epsilon=epsilon)\n", + " next_state, reward, done, _ = env.step(action)\n", + "\n", + " if replay_buffer is not None:\n", + " transition = Transition(state, action, reward, done, next_state)\n", + " replay_buffer.append(transition)\n", + "\n", + " total_reward += reward\n", + " state = next_state\n", + " if done:\n", + " break\n", + "\n", + " return total_reward, t\n", + "\n", + "\n", + "def generate_sessions(\n", + " env,\n", + " network: nn.Module,\n", + " t_max: int = 1000,\n", + " epsilon: float = -1,\n", + " replay_buffer: ReplayBuffer = None,\n", + " num_sessions: int = 100,\n", + ") -> Tuple[float, int]:\n", + " sessions_reward, sessions_steps = 0, 0\n", + " for i_episone in range(num_sessions):\n", + " r, t = generate_session(\n", + " env=env, network=network, t_max=t_max, epsilon=epsilon, replay_buffer=replay_buffer,\n", + " )\n", + " sessions_reward += r\n", + " sessions_steps += t\n", + " return sessions_reward, sessions_steps" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_network(env, num_hidden: int = 128):\n", + " inner_fn = utils.get_optimal_inner_init(nn.ReLU)\n", + " outer_fn = utils.outer_init\n", + "\n", + " network = torch.nn.Sequential(\n", + " nn.Linear(env.observation_space.shape[0], num_hidden),\n", + " nn.ReLU(),\n", + " nn.Linear(num_hidden, num_hidden),\n", + " nn.ReLU(),\n", + " )\n", + " head = nn.Linear(num_hidden, env.action_space.n)\n", + "\n", + " network.apply(inner_fn)\n", + " head.apply(outer_fn)\n", + "\n", + " return torch.nn.Sequential(network, head)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Catalyst" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class GameCallback(dl.Callback):\n", + " def __init__(\n", + " self,\n", + " *,\n", + " env,\n", + " replay_buffer: ReplayBuffer,\n", + " session_period: int,\n", + " epsilon: float,\n", + " epsilon_k: int,\n", + " actor_key,\n", + " ):\n", + " super().__init__(order=0)\n", + " self.env = env\n", + " self.replay_buffer = replay_buffer\n", + " self.session_period = session_period\n", + " self.epsilon = epsilon\n", + " self.epsilon_k = epsilon_k\n", + " self.actor_key = actor_key\n", + " self._initialized = False\n", + "\n", + " def on_epoch_start(self, runner: dl.IRunner):\n", + " self.epsilon *= self.epsilon_k\n", + " self.session_counter = 0\n", + " self.session_steps = 0\n", + "\n", + " if self._initialized:\n", + " return\n", + "\n", + " self.actor = runner.model[self.actor_key]\n", + "\n", + " self.actor.eval()\n", + " generate_sessions(\n", + " env=self.env,\n", + " network=self.actor,\n", + " epsilon=self.epsilon,\n", + " replay_buffer=self.replay_buffer,\n", + " num_sessions=1000,\n", + " )\n", + " self.actor.train()\n", + " self._initialized = True\n", + "\n", + " def on_batch_end(self, runner: dl.IRunner):\n", + " if runner.global_batch_step % self.session_period == 0:\n", + " self.actor.eval()\n", + "\n", + " session_reward, session_steps = generate_session(\n", + " env=self.env,\n", + " network=self.actor,\n", + " epsilon=self.epsilon,\n", + " replay_buffer=self.replay_buffer,\n", + " )\n", + "\n", + " self.session_counter += 1\n", + " self.session_steps += session_steps\n", + "\n", + " runner.batch_metrics.update({\"s_reward\": session_reward})\n", + " runner.batch_metrics.update({\"s_steps\": session_steps})\n", + "\n", + " self.actor.train()\n", + "\n", + " def on_epoch_end(self, runner: dl.IRunner):\n", + " num_sessions = 100\n", + "\n", + " self.actor.eval()\n", + " valid_rewards, valid_steps = generate_sessions(\n", + " env=self.env, network=self.actor, num_sessions=num_sessions\n", + " )\n", + " self.actor.train()\n", + "\n", + " valid_rewards /= num_sessions\n", + " runner.epoch_metrics[\"_epoch_\"][\"num_samples\"] = self.session_steps\n", + " runner.epoch_metrics[\"_epoch_\"][\"updates_per_sample\"] = (\n", + " runner.loader_sample_step / self.session_steps\n", + " )\n", + " runner.epoch_metrics[\"_epoch_\"][\"v_reward\"] = valid_rewards\n", + " runner.epoch_metrics[\"_epoch_\"][\"epsilon\"] = self.epsilon" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class CustomRunner(dl.Runner):\n", + " def __init__(\n", + " self,\n", + " *,\n", + " gamma: float,\n", + " tau: float,\n", + " tau_period: int = 1,\n", + " origin_key: str = \"origin\",\n", + " target_key: str = \"target\",\n", + " **kwargs,\n", + " ):\n", + " super().__init__(**kwargs)\n", + " self.gamma: float = gamma\n", + " self.tau: float = tau\n", + " self.tau_period: int = tau_period\n", + " self.origin_key: str = origin_key\n", + " self.target_key: str = target_key\n", + " self.origin_network: nn.Module = None\n", + " self.target_network: nn.Module = None\n", + " self._initialized = False\n", + "\n", + " def on_stage_start(self, runner: dl.IRunner):\n", + " super().on_stage_start(runner)\n", + " if self._initialized:\n", + " return\n", + " self.origin_network = self.model[self.origin_key]\n", + " self.target_network = self.model[self.target_key]\n", + " soft_update(self.target_network, self.origin_network, 1.0)\n", + "\n", + " def handle_batch(self, batch: Sequence[np.array]):\n", + " # model train/valid step\n", + " states, actions, rewards, dones, next_states = batch\n", + " network, target_network = self.origin_network, self.target_network\n", + "\n", + " # get q-values for all actions in current states\n", + " state_qvalues = network(states)\n", + " # select q-values for chosen actions\n", + " state_action_qvalues = state_qvalues.gather(1, actions.unsqueeze(-1)).squeeze(-1)\n", + "\n", + " # compute q-values for all actions in next states\n", + " # compute V*(next_states) using predicted next q-values\n", + " # at the last state we shall use simplified formula:\n", + " # Q(s,a) = r(s,a) since s' doesn't exist\n", + " with torch.no_grad():\n", + " next_state_qvalues = target_network(next_states)\n", + " next_state_values = next_state_qvalues.max(1)[0]\n", + " next_state_values[dones] = 0.0\n", + " next_state_values = next_state_values.detach()\n", + "\n", + " # compute \"target q-values\" for loss,\n", + " # it's what's inside square parentheses in the above formula.\n", + " target_state_action_qvalues = next_state_values * self.gamma + rewards\n", + "\n", + " # mean squared error loss to minimize\n", + " loss = self.criterion(state_action_qvalues, target_state_action_qvalues.detach())\n", + " self.batch_metrics.update({\"loss\": loss})\n", + "\n", + " if self.is_train_loader:\n", + " loss.backward()\n", + " self.optimizer.step()\n", + " self.optimizer.zero_grad()\n", + "\n", + " if self.global_batch_step % self.tau_period == 0:\n", + " soft_update(target_network, network, self.tau)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Training" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "batch_size = 64\n", + "epoch_size = int(1e3) * batch_size\n", + "buffer_size = int(1e5)\n", + "# runner settings, ~training\n", + "gamma = 0.99\n", + "tau = 0.01\n", + "tau_period = 1 # in batches\n", + "# callback, ~exploration\n", + "session_period = 100 # in batches\n", + "epsilon = 0.98\n", + "epsilon_k = 0.9\n", + "# optimization\n", + "lr = 3e-4\n", + "\n", + "# env_name = \"LunarLander-v2\"\n", + "env_name = \"CartPole-v1\"\n", + "env = gym.make(env_name)\n", + "replay_buffer = ReplayBuffer(buffer_size)\n", + "\n", + "network, target_network = get_network(env), get_network(env)\n", + "utils.set_requires_grad(target_network, requires_grad=False)\n", + "models = {\"origin\": network, \"target\": target_network}\n", + "criterion = torch.nn.MSELoss()\n", + "optimizer = torch.optim.Adam(network.parameters(), lr=lr)\n", + "loaders = {\n", + " \"train_game\": DataLoader(\n", + " ReplayDataset(replay_buffer, epoch_size=epoch_size), batch_size=batch_size,\n", + " ),\n", + "}\n", + "\n", + "runner = CustomRunner(gamma=gamma, tau=tau, tau_period=tau_period)\n", + "runner.train(\n", + " model=models,\n", + " criterion=criterion,\n", + " optimizer=optimizer,\n", + " loaders=loaders,\n", + " logdir=\"./logs_dqn\",\n", + " num_epochs=10,\n", + " verbose=True,\n", + " valid_loader=\"_epoch_\",\n", + " valid_metric=\"v_reward\",\n", + " minimize_valid_metric=False,\n", + " load_best_on_end=True,\n", + " callbacks=[\n", + " GameCallback(\n", + " env=env,\n", + " replay_buffer=replay_buffer,\n", + " session_period=session_period,\n", + " epsilon=epsilon,\n", + " epsilon_k=epsilon_k,\n", + " actor_key=\"origin\",\n", + " )\n", + " ],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Evaluating" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "env = gym.wrappers.Monitor(gym.make(env_name), directory=\"videos_dqn\", force=True)\n", + "generate_sessions(env=env, network=runner.model[\"origin\"], num_sessions=100)\n", + "env.close()\n", + "\n", + "# show video\n", + "from IPython.display import HTML\n", + "import os\n", + "\n", + "video_names = list(filter(lambda s: s.endswith(\".mp4\"), os.listdir(\"./videos_dqn/\")))\n", + "\n", + "HTML(\"\"\"\n", + "\n", + "\"\"\".format(\"./videos/\" + video_names[-1]))\n", + "# this may or may not be _last_ video. Try other indices" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Off-policy DDPG" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Iterator, Optional, Sequence, Tuple\n", + "from collections import deque, namedtuple\n", + "\n", + "import gym\n", + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "from torch.utils.data import DataLoader\n", + "from torch.utils.data.dataset import IterableDataset\n", + "\n", + "from catalyst import dl, utils" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## RL common" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "Transition = namedtuple(\n", + " \"Transition\", field_names=[\"state\", \"action\", \"reward\", \"done\", \"next_state\"]\n", + ")\n", + "\n", + "\n", + "class ReplayBuffer:\n", + " def __init__(self, capacity: int):\n", + " self.buffer = deque(maxlen=capacity)\n", + "\n", + " def append(self, transition: Transition):\n", + " self.buffer.append(transition)\n", + "\n", + " def sample(self, size: int) -> Sequence[np.array]:\n", + " indices = np.random.choice(len(self.buffer), size, replace=size > len(self.buffer))\n", + " states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])\n", + " states, actions, rewards, dones, next_states = (\n", + " np.array(states, dtype=np.float32),\n", + " np.array(actions, dtype=np.int64),\n", + " np.array(rewards, dtype=np.float32),\n", + " np.array(dones, dtype=np.bool),\n", + " np.array(next_states, dtype=np.float32),\n", + " )\n", + " return states, actions, rewards, dones, next_states\n", + "\n", + " def __len__(self) -> int:\n", + " return len(self.buffer)\n", + "\n", + "\n", + "# as far as RL does not have some predefined dataset,\n", + "# we need to specify epoch length by ourselfs\n", + "class ReplayDataset(IterableDataset):\n", + " def __init__(self, buffer: ReplayBuffer, epoch_size: int = int(1e3)):\n", + " self.buffer = buffer\n", + " self.epoch_size = epoch_size\n", + "\n", + " def __iter__(self) -> Iterator[Sequence[np.array]]:\n", + " states, actions, rewards, dones, next_states = self.buffer.sample(self.epoch_size)\n", + " for i in range(len(dones)):\n", + " yield states[i], actions[i], rewards[i], dones[i], next_states[i]\n", + "\n", + " def __len__(self) -> int:\n", + " return self.epoch_size\n", + "\n", + "\n", + "def soft_update(target: nn.Module, source: nn.Module, tau: float):\n", + " \"\"\"Updates the target data with smoothing by ``tau``\"\"\"\n", + " for target_param, param in zip(target.parameters(), source.parameters()):\n", + " target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DDPG" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class NormalizedActions(gym.ActionWrapper):\n", + " def action(self, action: float) -> float:\n", + " low_bound = self.action_space.low\n", + " upper_bound = self.action_space.high\n", + "\n", + " action = low_bound + (action + 1.0) * 0.5 * (upper_bound - low_bound)\n", + " action = np.clip(action, low_bound, upper_bound)\n", + "\n", + " return action\n", + "\n", + " def _reverse_action(self, action: float) -> float:\n", + " low_bound = self.action_space.low\n", + " upper_bound = self.action_space.high\n", + "\n", + " action = 2 * (action - low_bound) / (upper_bound - low_bound) - 1\n", + " action = np.clip(action, low_bound, upper_bound)\n", + "\n", + " return action\n", + "\n", + "\n", + "def get_action(\n", + " env, network: nn.Module, state: np.array, sigma: Optional[float] = None\n", + ") -> np.array:\n", + " state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)\n", + " action = network(state).detach().cpu().numpy()[0]\n", + " if sigma is not None:\n", + " action = np.random.normal(action, sigma)\n", + " return action\n", + "\n", + "\n", + "def generate_session(\n", + " env,\n", + " network: nn.Module,\n", + " sigma: Optional[float] = None,\n", + " replay_buffer: Optional[ReplayBuffer] = None,\n", + ") -> Tuple[float, int]:\n", + " total_reward = 0\n", + " state = env.reset()\n", + "\n", + " for t in range(env.spec.max_episode_steps):\n", + " action = get_action(env, network, state=state, sigma=sigma)\n", + " next_state, reward, done, _ = env.step(action)\n", + "\n", + " if replay_buffer is not None:\n", + " transition = Transition(state, action, reward, done, next_state)\n", + " replay_buffer.append(transition)\n", + "\n", + " total_reward += reward\n", + " state = next_state\n", + " if done:\n", + " break\n", + "\n", + " return total_reward, t\n", + "\n", + "\n", + "def generate_sessions(\n", + " env,\n", + " network: nn.Module,\n", + " sigma: Optional[float] = None,\n", + " replay_buffer: Optional[ReplayBuffer] = None,\n", + " num_sessions: int = 100,\n", + ") -> Tuple[float, int]:\n", + " sessions_reward, sessions_steps = 0, 0\n", + " for i_episone in range(num_sessions):\n", + " r, t = generate_session(\n", + " env=env, network=network, sigma=sigma, replay_buffer=replay_buffer,\n", + " )\n", + " sessions_reward += r\n", + " sessions_steps += t\n", + " return sessions_reward, sessions_steps" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_network_actor(env):\n", + " inner_fn = utils.get_optimal_inner_init(nn.ReLU)\n", + " outer_fn = utils.outer_init\n", + "\n", + " network = torch.nn.Sequential(\n", + " nn.Linear(env.observation_space.shape[0], 400), nn.ReLU(), nn.Linear(400, 300), nn.ReLU(),\n", + " )\n", + " head = torch.nn.Sequential(nn.Linear(300, 1), nn.Tanh())\n", + "\n", + " network.apply(inner_fn)\n", + " head.apply(outer_fn)\n", + "\n", + " return torch.nn.Sequential(network, head)\n", + "\n", + "\n", + "def get_network_critic(env):\n", + " inner_fn = utils.get_optimal_inner_init(nn.LeakyReLU)\n", + " outer_fn = utils.outer_init\n", + "\n", + " network = torch.nn.Sequential(\n", + " nn.Linear(env.observation_space.shape[0] + 1, 400),\n", + " nn.LeakyReLU(0.01),\n", + " nn.Linear(400, 300),\n", + " nn.LeakyReLU(0.01),\n", + " )\n", + " head = nn.Linear(300, 1)\n", + "\n", + " network.apply(inner_fn)\n", + " head.apply(outer_fn)\n", + "\n", + " return torch.nn.Sequential(network, head)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Catalyst" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class GameCallback(dl.Callback):\n", + " def __init__(\n", + " self,\n", + " *,\n", + " env,\n", + " replay_buffer: ReplayBuffer,\n", + " session_period: int,\n", + " sigma: float,\n", + " actor_key: str,\n", + " ):\n", + " super().__init__(order=0)\n", + " self.env = env\n", + " self.replay_buffer = replay_buffer\n", + " self.session_period = session_period\n", + " self.sigma = sigma\n", + " self.actor_key = actor_key\n", + "\n", + " def on_stage_start(self, runner: dl.IRunner):\n", + " self.actor = runner.model[self.actor_key]\n", + "\n", + " self.actor.eval()\n", + " generate_sessions(\n", + " env=self.env,\n", + " network=self.actor,\n", + " sigma=self.sigma,\n", + " replay_buffer=self.replay_buffer,\n", + " num_sessions=1000,\n", + " )\n", + " self.actor.train()\n", + "\n", + " def on_epoch_start(self, runner: dl.IRunner):\n", + " self.session_counter = 0\n", + " self.session_steps = 0\n", + "\n", + " def on_batch_end(self, runner: dl.IRunner):\n", + " if runner.global_batch_step % self.session_period == 0:\n", + " self.actor.eval()\n", + "\n", + " session_reward, session_steps = generate_session(\n", + " env=self.env,\n", + " network=self.actor,\n", + " sigma=self.sigma,\n", + " replay_buffer=self.replay_buffer,\n", + " )\n", + "\n", + " self.session_counter += 1\n", + " self.session_steps += session_steps\n", + "\n", + " runner.batch_metrics.update({\"s_reward\": session_reward})\n", + " runner.batch_metrics.update({\"s_steps\": session_steps})\n", + "\n", + " self.actor.train()\n", + "\n", + " def on_epoch_end(self, runner: dl.IRunner):\n", + " num_sessions = 100\n", + "\n", + " self.actor.eval()\n", + " valid_rewards, valid_steps = generate_sessions(\n", + " env=self.env, network=self.actor, num_sessions=num_sessions\n", + " )\n", + " self.actor.train()\n", + "\n", + " valid_rewards /= num_sessions\n", + " runner.epoch_metrics[\"_epoch_\"][\"num_samples\"] = self.session_steps\n", + " runner.epoch_metrics[\"_epoch_\"][\"updates_per_sample\"] = (\n", + " runner.loader_sample_step / self.session_steps\n", + " )\n", + " runner.epoch_metrics[\"_epoch_\"][\"v_reward\"] = valid_rewards" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class CustomRunner(dl.Runner):\n", + " def __init__(\n", + " self, *, gamma: float, tau: float, tau_period: int = 1, **kwargs,\n", + " ):\n", + " super().__init__(**kwargs)\n", + " self.gamma = gamma\n", + " self.tau = tau\n", + " self.tau_period = tau_period\n", + "\n", + " def on_stage_start(self, runner: dl.IRunner):\n", + " super().on_stage_start(runner)\n", + " soft_update(self.model[\"target_actor\"], self.model[\"actor\"], 1.0)\n", + " soft_update(self.model[\"target_critic\"], self.model[\"critic\"], 1.0)\n", + "\n", + " def handle_batch(self, batch: Sequence[torch.Tensor]):\n", + " # model train/valid step\n", + " states, actions, rewards, dones, next_states = batch\n", + " actor, target_actor = self.model[\"actor\"], self.model[\"target_actor\"]\n", + " critic, target_critic = self.model[\"critic\"], self.model[\"target_critic\"]\n", + " actor_optimizer, critic_optimizer = self.optimizer[\"actor\"], self.optimizer[\"critic\"]\n", + "\n", + " # get actions for the current state\n", + " pred_actions = actor(states)\n", + " # get q-values for the actions in current states\n", + " pred_critic_states = torch.cat([states, pred_actions], 1)\n", + " # use q-values to train the actor model\n", + " policy_loss = (-critic(pred_critic_states)).mean()\n", + "\n", + " with torch.no_grad():\n", + " # get possible actions for the next states\n", + " next_state_actions = target_actor(next_states)\n", + " # get possible q-values for the next actions\n", + " next_critic_states = torch.cat([next_states, next_state_actions], 1)\n", + " next_state_values = target_critic(next_critic_states).detach().squeeze()\n", + " next_state_values[dones] = 0.0\n", + "\n", + " # compute Bellman's equation value\n", + " target_state_values = next_state_values * self.gamma + rewards\n", + " # compute predicted values\n", + " critic_states = torch.cat([states, actions], 1)\n", + " state_values = critic(critic_states).squeeze()\n", + "\n", + " # train the critic model\n", + " value_loss = self.criterion(state_values, target_state_values.detach())\n", + "\n", + " self.batch_metrics.update({\"critic_loss\": value_loss, \"actor_loss\": policy_loss})\n", + "\n", + " if self.is_train_loader:\n", + " actor.zero_grad()\n", + " actor_optimizer.zero_grad()\n", + " policy_loss.backward()\n", + " actor_optimizer.step()\n", + "\n", + " critic.zero_grad()\n", + " critic_optimizer.zero_grad()\n", + " value_loss.backward()\n", + " critic_optimizer.step()\n", + "\n", + " if self.global_batch_step % self.tau_period == 0:\n", + " soft_update(target_actor, actor, self.tau)\n", + " soft_update(target_critic, critic, self.tau)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Training" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# data\n", + "batch_size = 64\n", + "epoch_size = int(1e3) * batch_size\n", + "buffer_size = int(1e5)\n", + "# runner settings, ~training\n", + "gamma = 0.99\n", + "tau = 0.01\n", + "tau_period = 1\n", + "# callback, ~exploration\n", + "session_period = 1\n", + "sigma = 0.3\n", + "# optimization\n", + "lr_actor = 1e-4\n", + "lr_critic = 1e-3\n", + "\n", + "# You can change game\n", + "# env_name = \"LunarLanderContinuous-v2\"\n", + "env_name = \"Pendulum-v0\"\n", + "env = NormalizedActions(gym.make(env_name))\n", + "replay_buffer = ReplayBuffer(buffer_size)\n", + "\n", + "actor, target_actor = get_network_actor(env), get_network_actor(env)\n", + "critic, target_critic = get_network_critic(env), get_network_critic(env)\n", + "utils.set_requires_grad(target_actor, requires_grad=False)\n", + "utils.set_requires_grad(target_critic, requires_grad=False)\n", + "\n", + "models = {\n", + " \"actor\": actor,\n", + " \"critic\": critic,\n", + " \"target_actor\": target_actor,\n", + " \"target_critic\": target_critic,\n", + "}\n", + "\n", + "criterion = torch.nn.MSELoss()\n", + "optimizer = {\n", + " \"actor\": torch.optim.Adam(actor.parameters(), lr_actor),\n", + " \"critic\": torch.optim.Adam(critic.parameters(), lr=lr_critic),\n", + "}\n", + "\n", + "loaders = {\n", + " \"train_game\": DataLoader(\n", + " ReplayDataset(replay_buffer, epoch_size=epoch_size), batch_size=batch_size,\n", + " ),\n", + "}\n", + "\n", + "runner = CustomRunner(gamma=gamma, tau=tau, tau_period=tau_period,)\n", + "\n", + "runner.train(\n", + " model=models,\n", + " criterion=criterion,\n", + " optimizer=optimizer,\n", + " loaders=loaders,\n", + " logdir=\"./logs_ddpg\",\n", + " num_epochs=10,\n", + " verbose=True,\n", + " valid_loader=\"_epoch_\",\n", + " valid_metric=\"v_reward\",\n", + " minimize_valid_metric=False,\n", + " load_best_on_end=True,\n", + " callbacks=[\n", + " GameCallback(\n", + " env=env,\n", + " replay_buffer=replay_buffer,\n", + " session_period=session_period,\n", + " sigma=sigma,\n", + " actor_key=\"actor\",\n", + " )\n", + " ],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Evaluating" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "env = gym.wrappers.Monitor(gym.make(env_name), directory=\"videos_ddpg\", force=True)\n", + "generate_sessions(env=env, network=runner.model[\"actor\"], num_sessions=100)\n", + "env.close()\n", + "\n", + "# show video\n", + "from IPython.display import HTML\n", + "import os\n", + "\n", + "video_names = list(filter(lambda s: s.endswith(\".mp4\"), os.listdir(\"./videos_ddpg/\")))\n", + "\n", + "HTML(\"\"\"\n", + "\n", + "\"\"\".format(\"./videos/\" + video_names[-1]))\n", + "# this may or may not be _last_ video. Try other indices" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# On-policy REINFORCE" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Iterator, Optional, Sequence, Tuple\n", + "from collections import deque, namedtuple\n", + "\n", + "import gym\n", + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "from torch.utils.data import DataLoader\n", + "from torch.utils.data.dataset import IterableDataset\n", + "\n", + "from catalyst import dl, utils" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## RL common" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "Rollout = namedtuple(\"Rollout\", field_names=[\"states\", \"actions\", \"rewards\",])\n", + "\n", + "\n", + "class RolloutBuffer:\n", + " def __init__(self, capacity: int):\n", + " self.capacity = capacity\n", + " self.buffer = deque(maxlen=capacity)\n", + "\n", + " def append(self, rollout: Rollout):\n", + " self.buffer.append(rollout)\n", + "\n", + " def sample(self, idx: int) -> Sequence[np.array]:\n", + " states, actions, rewards = self.buffer[idx]\n", + " states, actions, rewards = (\n", + " np.array(states, dtype=np.float32),\n", + " np.array(actions, dtype=np.int64),\n", + " np.array(rewards, dtype=np.float32),\n", + " )\n", + " return states, actions, rewards\n", + "\n", + " def __len__(self) -> int:\n", + " return len(self.buffer)\n", + "\n", + "\n", + "# as far as RL does not have some predefined dataset,\n", + "# we need to specify epoch length by ourselfs\n", + "class RolloutDataset(IterableDataset):\n", + " def __init__(self, buffer: RolloutBuffer):\n", + " self.buffer = buffer\n", + "\n", + " def __iter__(self) -> Iterator[Sequence[np.array]]:\n", + " for i in range(len(self.buffer)):\n", + " states, actions, rewards = self.buffer.sample(i)\n", + " yield states, actions, rewards\n", + " self.buffer.buffer.clear()\n", + "\n", + " def __len__(self) -> int:\n", + " return self.buffer.capacity" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## REINFORCE" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_cumulative_rewards(rewards, gamma=0.99):\n", + " G = [rewards[-1]]\n", + " for r in reversed(rewards[:-1]):\n", + " G.insert(0, r + gamma * G[0])\n", + " return G\n", + "\n", + "\n", + "def to_one_hot(y, n_dims=None):\n", + " \"\"\" Takes an integer vector and converts it to 1-hot matrix. \"\"\"\n", + " y_tensor = y\n", + " y_tensor = y_tensor.type(torch.LongTensor).view(-1, 1)\n", + " n_dims = n_dims if n_dims is not None else int(torch.max(y_tensor)) + 1\n", + " y_one_hot = torch.zeros(y_tensor.size()[0], n_dims).scatter_(1, y_tensor, 1)\n", + " return y_one_hot\n", + "\n", + "\n", + "def get_action(env, network: nn.Module, state: np.array, epsilon: float = -1) -> int:\n", + " state = torch.tensor(state[None], dtype=torch.float32)\n", + " logits = network(state).detach()\n", + " probas = F.softmax(logits, -1).cpu().numpy()[0]\n", + " action = np.random.choice(len(probas), p=probas)\n", + " return int(action)\n", + "\n", + "\n", + "def generate_session(\n", + " env,\n", + " network: nn.Module,\n", + " t_max: int = 1000,\n", + " epsilon: float = -1,\n", + " rollout_buffer: Optional[RolloutBuffer] = None,\n", + ") -> Tuple[float, int]:\n", + " total_reward = 0\n", + " states, actions, rewards = [], [], []\n", + " state = env.reset()\n", + "\n", + " for t in range(t_max):\n", + " action = get_action(env, network, state=state, epsilon=epsilon)\n", + " next_state, reward, done, _ = env.step(action)\n", + "\n", + " # record session history to train later\n", + " states.append(state)\n", + " actions.append(action)\n", + " rewards.append(reward)\n", + "\n", + " total_reward += reward\n", + " state = next_state\n", + " if done:\n", + " break\n", + " if rollout_buffer is not None:\n", + " rollout_buffer.append(Rollout(states, actions, rewards))\n", + "\n", + " return total_reward, t\n", + "\n", + "\n", + "def generate_sessions(\n", + " env,\n", + " network: nn.Module,\n", + " t_max: int = 1000,\n", + " epsilon: float = -1,\n", + " rollout_buffer: Optional[RolloutBuffer] = None,\n", + " num_sessions: int = 100,\n", + ") -> Tuple[float, int]:\n", + " sessions_reward, sessions_steps = 0, 0\n", + " for i_episone in range(num_sessions):\n", + " r, t = generate_session(\n", + " env=env, network=network, t_max=t_max, epsilon=epsilon, rollout_buffer=rollout_buffer,\n", + " )\n", + " sessions_reward += r\n", + " sessions_steps += t\n", + " return sessions_reward, sessions_steps" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_network(env, num_hidden: int = 128):\n", + " inner_fn = utils.get_optimal_inner_init(nn.ReLU)\n", + " outer_fn = utils.outer_init\n", + "\n", + " network = torch.nn.Sequential(\n", + " nn.Linear(env.observation_space.shape[0], num_hidden),\n", + " nn.ReLU(),\n", + " nn.Linear(num_hidden, num_hidden),\n", + " nn.ReLU(),\n", + " )\n", + " head = nn.Linear(num_hidden, env.action_space.n)\n", + "\n", + " network.apply(inner_fn)\n", + " head.apply(outer_fn)\n", + "\n", + " return torch.nn.Sequential(network, head)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Catalyst" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class GameCallback(dl.Callback):\n", + " def __init__(self, *, env, rollout_buffer: RolloutBuffer):\n", + " super().__init__(order=0)\n", + " self.env = env\n", + " self.rollout_buffer = rollout_buffer\n", + "\n", + " def on_epoch_start(self, runner: dl.IRunner):\n", + " self.actor = runner.model\n", + "\n", + " self.actor.eval()\n", + " generate_sessions(\n", + " env=self.env, network=self.actor, rollout_buffer=self.rollout_buffer, num_sessions=100,\n", + " )\n", + " self.actor.train()\n", + "\n", + " def on_epoch_end(self, runner: dl.IRunner):\n", + " num_sessions = 100\n", + "\n", + " self.actor.eval()\n", + " valid_rewards, valid_steps = generate_sessions(\n", + " env=self.env, network=self.actor, num_sessions=num_sessions\n", + " )\n", + " self.actor.train()\n", + "\n", + " valid_rewards /= num_sessions\n", + " runner.epoch_metrics[\"_epoch_\"][\"v_reward\"] = valid_rewards" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class CustomRunner(dl.Runner):\n", + " def __init__(\n", + " self, *, gamma: float, **kwargs,\n", + " ):\n", + " super().__init__(**kwargs)\n", + " self.gamma: float = gamma\n", + " self._initialized = False\n", + "\n", + " def handle_batch(self, batch: Sequence[np.array]):\n", + " # model train/valid step\n", + " # ATTENTION:\n", + " # because of different trajectories lens\n", + " # ONLY batch_size==1 supported\n", + " states, actions, rewards = batch\n", + " states, actions, rewards = states[0], actions[0], rewards[0]\n", + " cumulative_returns = torch.tensor(get_cumulative_rewards(rewards, gamma))\n", + " network = self.model\n", + "\n", + " logits = network(states)\n", + " probas = F.softmax(logits, -1)\n", + " logprobas = F.log_softmax(logits, -1)\n", + " n_actions = probas.shape[1]\n", + " logprobas_for_actions = torch.sum(logprobas * to_one_hot(actions, n_dims=n_actions), dim=1)\n", + "\n", + " J_hat = torch.mean(logprobas_for_actions * cumulative_returns)\n", + " entropy_reg = -torch.mean(torch.sum(probas * logprobas, dim=1))\n", + " loss = -J_hat - 0.1 * entropy_reg\n", + "\n", + " self.batch_metrics.update({\"loss\": loss})\n", + " if self.is_train_loader:\n", + " loss.backward()\n", + " self.optimizer.step()\n", + " self.optimizer.zero_grad()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Training" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "batch_size = 1\n", + "epoch_size = int(1e3) * batch_size\n", + "buffer_size = int(1e2)\n", + "# runner settings\n", + "gamma = 0.99\n", + "# optimization\n", + "lr = 3e-4\n", + "\n", + "# env_name = \"LunarLander-v2\"\n", + "env_name = \"CartPole-v1\"\n", + "env = gym.make(env_name)\n", + "rollout_buffer = RolloutBuffer(buffer_size)\n", + "\n", + "model = get_network(env)\n", + "optimizer = torch.optim.Adam(model.parameters(), lr=lr)\n", + "loaders = {\n", + " \"train_game\": DataLoader(RolloutDataset(rollout_buffer), batch_size=batch_size,),\n", + "}\n", + "\n", + "runner = CustomRunner(gamma=gamma)\n", + "runner.train(\n", + " model=model,\n", + " optimizer=optimizer,\n", + " loaders=loaders,\n", + " logdir=\"./logs_dqn\",\n", + " num_epochs=10,\n", + " verbose=True,\n", + " valid_loader=\"_epoch_\",\n", + " valid_metric=\"v_reward\",\n", + " minimize_valid_metric=False,\n", + " load_best_on_end=True,\n", + " callbacks=[GameCallback(env=env, rollout_buffer=rollout_buffer,)],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Evaluating" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "env = gym.wrappers.Monitor(gym.make(env_name), directory=\"videos_reinforce\", force=True)\n", + "generate_sessions(env=env, network=model, num_sessions=100)\n", + "env.close()\n", + "\n", + "from IPython.display import HTML\n", + "import os\n", + "\n", + "video_names = list(filter(lambda s: s.endswith(\".mp4\"), os.listdir(\"./videos_reinforce/\")))\n", + "\n", + "HTML(\"\"\"\n", + "\n", + "\"\"\".format(\"./videos/\" + video_names[-1]))\n", + "# this may or may not be _last_ video. Try other indices" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:py37] *", + "language": "python", + "name": "conda-env-py37-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/reinforcement_learning/README.md b/examples/reinforcement_learning/README.md index 3320740b33..d658a1f8ca 100644 --- a/examples/reinforcement_learning/README.md +++ b/examples/reinforcement_learning/README.md @@ -1,6 +1,6 @@ # Requirements ```bash -pip install catalyst gym +pip install catalyst==21.04.2 gym==0.18.0 ``` # Run diff --git a/examples/reinforcement_learning/ddpg.py b/examples/reinforcement_learning/ddpg.py index 5cde4e1ff8..9ba52f4b2d 100644 --- a/examples/reinforcement_learning/ddpg.py +++ b/examples/reinforcement_learning/ddpg.py @@ -1,3 +1,4 @@ +# flake8: noqa from typing import Iterator, Optional, Sequence, Tuple from collections import deque, namedtuple diff --git a/examples/reinforcement_learning/dqn.py b/examples/reinforcement_learning/dqn.py index 6514752424..b8d3933abb 100644 --- a/examples/reinforcement_learning/dqn.py +++ b/examples/reinforcement_learning/dqn.py @@ -1,3 +1,4 @@ +# flake8: noqa from typing import Iterator, Optional, Sequence, Tuple from collections import deque, namedtuple @@ -348,9 +349,9 @@ def handle_batch(self, batch: Sequence[np.array]): # # show video # from IPython.display import HTML # import os - # + # # video_names = list(filter(lambda s: s.endswith(".mp4"), os.listdir("./videos_dqn/"))) - # + # # HTML(""" #